diff --git a/Cargo.lock b/Cargo.lock
index 812b825cc5..0c37dad82e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2832,6 +2832,7 @@ dependencies = [
name = "influxdb3_catalog"
version = "0.1.0"
dependencies = [
+ "anyhow",
"arrow",
"bimap",
"chrono",
@@ -3202,6 +3203,7 @@ dependencies = [
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_py_api",
+ "influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_test_helpers",
"influxdb3_wal",
diff --git a/deny.toml b/deny.toml
index fd8625b5ae..547047ed0b 100644
--- a/deny.toml
+++ b/deny.toml
@@ -1,7 +1,8 @@
# Configuration documentation:
-# https://embarkstudios.github.io/cargo-deny/index.html
+# https://embarkstudios.github.io/cargo-deny/index.html
[advisories]
+version = 2
yanked = "deny"
ignore = [
# dependent on datafusion-common moving away from instant
@@ -11,6 +12,8 @@ ignore = [
git-fetch-with-cli = true
[licenses]
+version = 2
+unused-allowed-license = "warn"
allow = [
"Apache-2.0",
"BSD-2-Clause",
@@ -22,7 +25,6 @@ allow = [
"Unicode-3.0",
"Zlib",
]
-
exceptions = [
# We should probably NOT bundle CA certs but use the OS ones.
{ name = "webpki-roots", allow = ["MPL-2.0"] },
@@ -41,10 +43,10 @@ license-files = [
github = ["influxdata"]
[bans]
-multiple-versions = "warn"
+multiple-versions = "allow"
deny = [
- # We are using rustls as the TLS implementation, so we shouldn't be linking
- # in OpenSSL too.
+ # We are using rustls as the TLS implementation, so we shouldn't be linking
+ # in OpenSSL too.
#
# If you're hitting this, you might want to take a look at what new
# dependencies you have introduced and check if there's a way to depend on
@@ -53,4 +55,6 @@ deny = [
# We've decided to use the `humantime` crate to parse and generate friendly time formats; use
# that rather than chrono-english.
{ name = "chrono-english" },
+ # Use stdlib ( https://doc.rust-lang.org/stable/std/io/trait.IsTerminal.html )
+ { name = "atty" },
]
diff --git a/influxdb3_cache/src/distinct_cache/cache.rs b/influxdb3_cache/src/distinct_cache/cache.rs
index bb1f24079d..62c8793aaa 100644
--- a/influxdb3_cache/src/distinct_cache/cache.rs
+++ b/influxdb3_cache/src/distinct_cache/cache.rs
@@ -473,7 +473,7 @@ impl Node {
let block = builder.append_block(value.0.as_bytes().into());
for _ in 0..count {
builder
- .try_append_view(block, 0u32, value.0.as_bytes().len() as u32)
+ .try_append_view(block, 0u32, value.0.len() as u32)
.expect("append view for known valid block, offset and length");
}
}
diff --git a/influxdb3_catalog/Cargo.toml b/influxdb3_catalog/Cargo.toml
index b4253fd8ec..a87e48b1e2 100644
--- a/influxdb3_catalog/Cargo.toml
+++ b/influxdb3_catalog/Cargo.toml
@@ -17,6 +17,7 @@ influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
+anyhow.workspace = true
arrow.workspace = true
bimap.workspace = true
chrono.workspace = true
diff --git a/influxdb3_clap_blocks/src/datafusion.rs b/influxdb3_clap_blocks/src/datafusion.rs
index 01fc7b9a40..29e7c37fda 100644
--- a/influxdb3_clap_blocks/src/datafusion.rs
+++ b/influxdb3_clap_blocks/src/datafusion.rs
@@ -4,7 +4,7 @@ use datafusion::config::ConfigExtension;
use iox_query::config::IoxConfigExt;
/// Extends the standard [`HashMap`] based DataFusion config option in the CLI with specific
-/// options (along with defaults) for InfluxDB 3 OSS/Pro. This is intended for customization of
+/// options (along with defaults) for InfluxDB 3 Core/Enterprise. This is intended for customization of
/// options that are defined in the `iox_query` crate, e.g., those defined in [`IoxConfigExt`]
/// that are relevant to the monolithinc versions of InfluxDB 3.
#[derive(Debug, clap::Parser, Clone)]
diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs
index 15014475e2..e51c343671 100644
--- a/influxdb3_client/src/lib.rs
+++ b/influxdb3_client/src/lib.rs
@@ -852,7 +852,7 @@ impl<'c> WriteRequestBuilder<'c, NoBody> {
}
}
-impl<'c> WriteRequestBuilder<'c, Body> {
+impl WriteRequestBuilder<'_, Body> {
/// Send the request to the server
pub async fn send(self) -> Result<()> {
let url = self.client.base_url.join("/api/v3/write_lp")?;
@@ -900,7 +900,7 @@ pub struct QueryRequestBuilder<'c> {
// TODO - for now the send method just returns the bytes from the response.
// It may be nicer to have the format parameter dictate how we return from
// send, e.g., using types more specific to the format selected.
-impl<'c> QueryRequestBuilder<'c> {
+impl QueryRequestBuilder<'_> {
/// Specify the format, `json`, `csv`, `pretty`, or `parquet`
pub fn format(mut self, format: Format) -> Self {
self.format = Some(format);
@@ -1101,7 +1101,7 @@ pub struct ShowDatabasesRequestBuilder<'c> {
show_deleted: bool,
}
-impl<'c> ShowDatabasesRequestBuilder<'c> {
+impl ShowDatabasesRequestBuilder<'_> {
/// Specify whether or not to show deleted databases in the output
pub fn with_show_deleted(mut self, show_deleted: bool) -> Self {
self.show_deleted = show_deleted;
diff --git a/influxdb3_internal_api/src/query_executor.rs b/influxdb3_internal_api/src/query_executor.rs
index 05266d45f5..6a44966b7c 100644
--- a/influxdb3_internal_api/src/query_executor.rs
+++ b/influxdb3_internal_api/src/query_executor.rs
@@ -24,6 +24,8 @@ pub enum QueryExecutorError {
DatabasesToRecordBatch(#[source] ArrowError),
#[error("unable to compose record batches from retention policies: {0}")]
RetentionPoliciesToRecordBatch(#[source] ArrowError),
+ #[error("invokded a method that is not implemented: {0}")]
+ MethodNotImplemented(&'static str),
}
#[async_trait]
diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs
index 81508ab73e..ebe00432f3 100644
--- a/influxdb3_server/src/http.rs
+++ b/influxdb3_server/src/http.rs
@@ -247,6 +247,10 @@ impl Error {
fn into_response(self) -> Response
{
debug!(error = ?self, "API error");
match self {
+ Self::Query(err @ QueryExecutorError::MethodNotImplemented(_)) => Response::builder()
+ .status(StatusCode::METHOD_NOT_ALLOWED)
+ .body(Body::from(err.to_string()))
+ .unwrap(),
Self::WriteBuffer(err @ WriteBufferError::DatabaseNotFound { db_name: _ }) => {
Response::builder()
.status(StatusCode::NOT_FOUND)
@@ -1254,7 +1258,7 @@ fn json_content_type(headers: &HeaderMap) -> bool {
};
let is_json_content_type = mime.type_() == "application"
- && (mime.subtype() == "json" || mime.suffix().map_or(false, |name| name == "json"));
+ && (mime.subtype() == "json" || mime.suffix().is_some_and(|name| name == "json"));
is_json_content_type
}
@@ -1329,7 +1333,7 @@ fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameErr
let mut rp_seperator_found = false;
let mut last_char = None;
for grapheme in name.graphemes(true) {
- if grapheme.as_bytes().len() > 1 {
+ if grapheme.len() > 1 {
// In the case of a unicode we need to handle multibyte chars
return Err(ValidateDbNameError::InvalidChar);
}
diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs
index ef4c6164a0..79305035d7 100644
--- a/influxdb3_server/src/lib.rs
+++ b/influxdb3_server/src/lib.rs
@@ -1,4 +1,4 @@
-//! InfluxDB 3 Core server implementation
+//! InfluxDB 3 Enterprise server implementation
//!
//! The server is responsible for handling the HTTP API
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
@@ -789,7 +789,7 @@ mod tests {
let parquet_metrics_provider: Arc =
Arc::clone(&write_buffer_impl.persisted_files());
let sample_telem_store =
- TelemetryStore::new_without_background_runners(parquet_metrics_provider);
+ TelemetryStore::new_without_background_runners(Some(parquet_metrics_provider));
let write_buffer: Arc = write_buffer_impl;
let common_state = crate::CommonServerState::new(
Arc::clone(&metrics),
diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs
index 3e92b3bead..8a5284fa00 100644
--- a/influxdb3_server/src/query_executor/mod.rs
+++ b/influxdb3_server/src/query_executor/mod.rs
@@ -133,55 +133,16 @@ impl QueryExecutor for QueryExecutorImpl {
db_name: database.to_string(),
})?;
- let params = params.unwrap_or_default();
-
- let token = db.record_query(
- external_span_ctx.as_ref().map(RequestLogContext::ctx),
- kind.query_type(),
- Box::new(query.to_string()),
- params.clone(),
- );
-
- // NOTE - we use the default query configuration on the IOxSessionContext here:
- let ctx = db.new_query_context(span_ctx, Default::default());
- let planner = Planner::new(&ctx);
- let query = query.to_string();
-
- // Perform query planning on a separate threadpool than the IO runtime that is servicing
- // this request by using `IOxSessionContext::run`.
- let plan = ctx
- .run(async move {
- match kind {
- QueryKind::Sql => planner.sql(query, params).await,
- QueryKind::InfluxQl => planner.influxql(query, params).await,
- }
- })
- .await;
-
- let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
- Ok(plan) => plan,
- Err(e) => {
- token.fail();
- return Err(e);
- }
- };
- let token = token.planned(&ctx, Arc::clone(&plan));
-
- // TODO: Enforce concurrency limit here
- let token = token.permit();
-
- self.telemetry_store.update_num_queries();
-
- match ctx.execute_stream(Arc::clone(&plan)).await {
- Ok(query_results) => {
- token.success();
- Ok(query_results)
- }
- Err(err) => {
- token.fail();
- Err(QueryExecutorError::ExecuteStream(err))
- }
- }
+ query_database(
+ db,
+ query,
+ params,
+ kind,
+ span_ctx,
+ external_span_ctx,
+ Arc::clone(&self.telemetry_store),
+ )
+ .await
}
fn show_databases(
@@ -271,6 +232,66 @@ impl QueryExecutor for QueryExecutorImpl {
}
}
+async fn query_database(
+ db: Arc,
+ query: &str,
+ params: Option,
+ kind: QueryKind,
+ span_ctx: Option,
+ external_span_ctx: Option,
+ telemetry_store: Arc,
+) -> Result {
+ let params = params.unwrap_or_default();
+
+ let token = db.record_query(
+ external_span_ctx.as_ref().map(RequestLogContext::ctx),
+ kind.query_type(),
+ Box::new(query.to_string()),
+ params.clone(),
+ );
+
+ // NOTE - we use the default query configuration on the IOxSessionContext here:
+ let ctx = db.new_query_context(span_ctx, Default::default());
+ let planner = Planner::new(&ctx);
+ let query = query.to_string();
+
+ // Perform query planning on a separate threadpool than the IO runtime that is servicing
+ // this request by using `IOxSessionContext::run`.
+ let plan = ctx
+ .run(async move {
+ match kind {
+ QueryKind::Sql => planner.sql(query, params).await,
+ QueryKind::InfluxQl => planner.influxql(query, params).await,
+ }
+ })
+ .await;
+
+ let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
+ Ok(plan) => plan,
+ Err(e) => {
+ token.fail();
+ return Err(e);
+ }
+ };
+ let token = token.planned(&ctx, Arc::clone(&plan));
+
+ // TODO: Enforce concurrency limit here
+ let token = token.permit();
+
+ telemetry_store.update_num_queries();
+
+ match ctx.execute_stream(Arc::clone(&plan)).await {
+ Ok(query_results) => {
+ token.success();
+ Ok(query_results)
+ }
+ Err(err) => {
+ token.fail();
+ Err(QueryExecutorError::ExecuteStream(err))
+ }
+ }
+}
+
#[derive(Debug)]
struct RetentionPolicyRow {
database: String,
@@ -351,21 +372,18 @@ impl QueryDatabase for QueryExecutorImpl {
db_name: name.into(),
}))
})?;
- Ok(Some(Arc::new(Database::new(
+ Ok(Some(Arc::new(Database::new(CreateDatabaseArgs {
db_schema,
- Arc::clone(&self.write_buffer),
- Arc::clone(&self.exec),
- Arc::clone(&self.datafusion_config),
- Arc::clone(&self.query_log),
- Arc::clone(&self.sys_events_store),
- ))))
+ write_buffer: Arc::clone(&self.write_buffer),
+ exec: Arc::clone(&self.exec),
+ datafusion_config: Arc::clone(&self.datafusion_config),
+ query_log: Arc::clone(&self.query_log),
+ sys_events_store: Arc::clone(&self.sys_events_store),
+ }))))
}
async fn acquire_semaphore(&self, span: Option) -> InstrumentedAsyncOwnedSemaphorePermit {
- Arc::clone(&self.query_execution_semaphore)
- .acquire_owned(span)
- .await
- .expect("Semaphore should not be closed by anyone")
+ acquire_semaphore(Arc::clone(&self.query_execution_semaphore), span).await
}
fn query_log(&self) -> QueryLogEntries {
@@ -373,6 +391,27 @@ impl QueryDatabase for QueryExecutorImpl {
}
}
+async fn acquire_semaphore(
+ semaphore: Arc,
+ span: Option,
+) -> InstrumentedAsyncOwnedSemaphorePermit {
+ semaphore
+ .acquire_owned(span)
+ .await
+ .expect("Semaphore should not be closed by anyone")
+}
+
+/// Arguments for [`Database::new`]
+#[derive(Debug)]
+pub struct CreateDatabaseArgs {
+ db_schema: Arc,
+ write_buffer: Arc,
+ exec: Arc,
+ datafusion_config: Arc>,
+ query_log: Arc,
+ sys_events_store: Arc,
+}
+
#[derive(Debug, Clone)]
pub struct Database {
db_schema: Arc,
@@ -385,12 +424,14 @@ pub struct Database {
impl Database {
pub fn new(
- db_schema: Arc,
- write_buffer: Arc,
- exec: Arc,
- datafusion_config: Arc>,
- query_log: Arc,
- sys_events_store: Arc,
+ CreateDatabaseArgs {
+ db_schema,
+ write_buffer,
+ exec,
+ datafusion_config,
+ query_log,
+ sys_events_store,
+ }: CreateDatabaseArgs,
) -> Self {
let system_schema_provider = Arc::new(SystemSchemaProvider::AllSystemSchemaTables(
AllSystemSchemaTablesProvider::new(
@@ -626,6 +667,7 @@ impl TableProvider for QueryTable {
provider.scan(ctx, projection, &filters, limit).await
}
}
+
#[cfg(test)]
mod tests {
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
@@ -679,7 +721,12 @@ mod tests {
))
}
- async fn setup() -> (Arc, QueryExecutorImpl, Arc) {
+ pub(crate) async fn setup() -> (
+ Arc,
+ QueryExecutorImpl,
+ Arc,
+ Arc,
+ ) {
// Set up QueryExecutor
let object_store: Arc =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
@@ -718,7 +765,7 @@ mod tests {
.unwrap();
let persisted_files: Arc = Arc::clone(&write_buffer_impl.persisted_files());
- let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files);
+ let telemetry_store = TelemetryStore::new_without_background_runners(Some(persisted_files));
let sys_events_store = Arc::new(SysEventStore::new(Arc::::clone(
&time_provider,
)));
@@ -733,15 +780,20 @@ mod tests {
datafusion_config,
query_log_size: 10,
telemetry_store,
- sys_events_store,
+ sys_events_store: Arc::clone(&sys_events_store),
});
- (write_buffer, query_executor, time_provider)
+ (
+ write_buffer,
+ query_executor,
+ time_provider,
+ sys_events_store,
+ )
}
#[test_log::test(tokio::test)]
async fn system_parquet_files_success() {
- let (write_buffer, query_executor, time_provider) = setup().await;
+ let (write_buffer, query_executor, time_provider, _) = setup().await;
// Perform some writes to multiple tables
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
diff --git a/influxdb3_telemetry/src/sampler.rs b/influxdb3_telemetry/src/sampler.rs
index 2f3b68f8a9..8d54ddce4e 100644
--- a/influxdb3_telemetry/src/sampler.rs
+++ b/influxdb3_telemetry/src/sampler.rs
@@ -125,7 +125,8 @@ mod tests {
#[test]
fn test_sample_all_metrics() {
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
- let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));
+ let store =
+ TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
mock_sys_info_provider
.expect_get_pid()
@@ -145,7 +146,8 @@ mod tests {
#[test]
fn test_sample_all_metrics_with_call_failure() {
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
- let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));
+ let store =
+ TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
mock_sys_info_provider
.expect_get_pid()
diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs
index ff23c68e44..6b092631c7 100644
--- a/influxdb3_telemetry/src/store.rs
+++ b/influxdb3_telemetry/src/store.rs
@@ -71,7 +71,9 @@ impl TelemetryStore {
store
}
- pub fn new_without_background_runners(persisted_files: Arc) -> Arc {
+ pub fn new_without_background_runners(
+ persisted_files: Option>,
+ ) -> Arc {
let instance_id = Arc::from("sample-instance-id");
let os = Arc::from("Linux");
let influx_version = Arc::from("influxdb3-0.1.0");
@@ -80,7 +82,7 @@ impl TelemetryStore {
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
Arc::new(TelemetryStore {
inner: parking_lot::Mutex::new(inner),
- persisted_files: Some(persisted_files),
+ persisted_files,
})
}
@@ -191,7 +193,7 @@ impl TelemetryStoreInner {
instance_id: self.instance_id.clone(),
storage_type: self.storage_type.clone(),
cores: self.cores,
- product_type: "OSS",
+ product_type: "Core",
uptime_secs: self.start_timer.elapsed().as_secs(),
cpu_utilization_percent_min_1m: self.cpu.utilization.min,
@@ -316,7 +318,7 @@ mod tests {
let store: Arc = TelemetryStore::new(
Arc::from("some-instance-id"),
Arc::from("Linux"),
- Arc::from("OSS-v3.0"),
+ Arc::from("Core-v3.0"),
Arc::from("Memory"),
10,
Some(parqet_file_metrics),
diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs
index 3521f53d15..b7d0d72dad 100644
--- a/influxdb3_wal/src/create.rs
+++ b/influxdb3_wal/src/create.rs
@@ -43,6 +43,19 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
WalOp::Write(write_batch)
}
+pub fn catalog_batch_op(
+ db_id: DbId,
+ db_name: impl Into>,
+ time_ns: i64,
+ ops: impl IntoIterator- ,
+ sequence_number: u32,
+) -> WalOp {
+ WalOp::Catalog(OrderedCatalogBatch::new(
+ catalog_batch(db_id, db_name, time_ns, ops),
+ sequence_number,
+ ))
+}
+
pub fn catalog_batch(
db_id: DbId,
db_name: impl Into>,
diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs
index 17b40b5f93..6e4fbb94b8 100644
--- a/influxdb3_wal/src/lib.rs
+++ b/influxdb3_wal/src/lib.rs
@@ -16,7 +16,6 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
-use observability_deps::tracing::error;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -320,6 +319,10 @@ impl OrderedCatalogBatch {
pub fn batch(&self) -> &CatalogBatch {
&self.catalog
}
+
+ pub fn into_batch(self) -> CatalogBatch {
+ self.catalog
+ }
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs
index 0d9947e4f3..5b8f61cbec 100644
--- a/influxdb3_wal/src/object_store.rs
+++ b/influxdb3_wal/src/object_store.rs
@@ -252,6 +252,7 @@ impl WalObjectStore {
.await
};
info!(
+ host = self.host_identifier_prefix,
n_ops = %wal_contents.ops.len(),
min_timestamp_ns = %wal_contents.min_timestamp_ns,
max_timestamp_ns = %wal_contents.max_timestamp_ns,
diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml
index 5606d72c5a..7039f6402e 100644
--- a/influxdb3_write/Cargo.toml
+++ b/influxdb3_write/Cargo.toml
@@ -32,6 +32,7 @@ influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
+influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_py_api = {path = "../influxdb3_py_api"}
# crates.io dependencies
diff --git a/influxdb3_write/src/chunk.rs b/influxdb3_write/src/chunk.rs
index 964686bd71..7ee70bf862 100644
--- a/influxdb3_write/src/chunk.rs
+++ b/influxdb3_write/src/chunk.rs
@@ -64,13 +64,13 @@ impl QueryChunk for BufferChunk {
#[derive(Debug)]
pub struct ParquetChunk {
- pub(crate) schema: Schema,
- pub(crate) stats: Arc,
- pub(crate) partition_id: TransitionPartitionId,
- pub(crate) sort_key: Option,
- pub(crate) id: ChunkId,
- pub(crate) chunk_order: ChunkOrder,
- pub(crate) parquet_exec: ParquetExecInput,
+ pub schema: Schema,
+ pub stats: Arc,
+ pub partition_id: TransitionPartitionId,
+ pub sort_key: Option,
+ pub id: ChunkId,
+ pub chunk_order: ChunkOrder,
+ pub parquet_exec: ParquetExecInput,
}
impl QueryChunk for ParquetChunk {
diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs
index 33fadd1f71..d6faca0c96 100644
--- a/influxdb3_write/src/lib.rs
+++ b/influxdb3_write/src/lib.rs
@@ -260,6 +260,32 @@ impl PersistedSnapshot {
.or_default()
.push(parquet_file);
}
+
+ pub fn db_table_and_file_count(&self) -> (u64, u64, u64) {
+ let mut db_count = 0;
+ let mut table_count = 0;
+ let mut file_count = 0;
+ for (_, db_tables) in &self.databases {
+ db_count += 1;
+ table_count += db_tables.tables.len() as u64;
+ file_count += db_tables.tables.values().fold(0, |mut acc, files| {
+ acc += files.len() as u64;
+ acc
+ });
+ }
+ (db_count, table_count, file_count)
+ }
+
+ pub fn overall_db_table_file_counts(host_snapshots: &[PersistedSnapshot]) -> (u64, u64, u64) {
+ let overall_counts = host_snapshots.iter().fold((0, 0, 0), |mut acc, item| {
+ let (db_count, table_count, file_count) = item.db_table_and_file_count();
+ acc.0 += db_count;
+ acc.1 += table_count;
+ acc.2 += file_count;
+ acc
+ });
+ overall_counts
+ }
}
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
@@ -274,8 +300,11 @@ pub struct ParquetFile {
pub path: String,
pub size_bytes: u64,
pub row_count: u64,
+ /// chunk time nanos
pub chunk_time: i64,
+ /// min time nanos
pub min_time: i64,
+ /// max time nanos
pub max_time: i64,
}
@@ -429,3 +458,175 @@ pub(crate) mod test_help {
))
}
}
+
+#[cfg(test)]
+mod tests {
+ use influxdb3_catalog::catalog::CatalogSequenceNumber;
+ use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
+ use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
+
+ use crate::{DatabaseTables, ParquetFile, PersistedSnapshot};
+
+ #[test]
+ fn test_overall_counts() {
+ let host = "host_id";
+ // db 1 setup
+ let db_id_1 = DbId::from(0);
+ let mut dbs_1 = SerdeVecMap::new();
+ let table_id_1 = TableId::from(0);
+ let mut tables_1 = SerdeVecMap::new();
+ let parquet_files_1 = vec![
+ ParquetFile {
+ id: ParquetFileId::from(1),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ParquetFile {
+ id: ParquetFileId::from(2),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ];
+ tables_1.insert(table_id_1, parquet_files_1);
+ dbs_1.insert(db_id_1, DatabaseTables { tables: tables_1 });
+
+ // add dbs_1 to snapshot
+ let persisted_snapshot_1 = PersistedSnapshot {
+ host_id: host.to_string(),
+ next_file_id: ParquetFileId::from(0),
+ next_db_id: DbId::from(1),
+ next_table_id: TableId::from(1),
+ next_column_id: ColumnId::from(1),
+ snapshot_sequence_number: SnapshotSequenceNumber::new(124),
+ wal_file_sequence_number: WalFileSequenceNumber::new(100),
+ catalog_sequence_number: CatalogSequenceNumber::new(100),
+ databases: dbs_1,
+ min_time: 0,
+ max_time: 1,
+ row_count: 0,
+ parquet_size_bytes: 0,
+ };
+
+ // db 2 setup
+ let db_id_2 = DbId::from(2);
+ let mut dbs_2 = SerdeVecMap::new();
+ let table_id_2 = TableId::from(2);
+ let mut tables_2 = SerdeVecMap::new();
+ let parquet_files_2 = vec![
+ ParquetFile {
+ id: ParquetFileId::from(4),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ParquetFile {
+ id: ParquetFileId::from(5),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ];
+ tables_2.insert(table_id_2, parquet_files_2);
+ dbs_2.insert(db_id_2, DatabaseTables { tables: tables_2 });
+
+ // add dbs_2 to snapshot
+ let persisted_snapshot_2 = PersistedSnapshot {
+ host_id: host.to_string(),
+ next_file_id: ParquetFileId::from(5),
+ next_db_id: DbId::from(2),
+ next_table_id: TableId::from(22),
+ next_column_id: ColumnId::from(22),
+ snapshot_sequence_number: SnapshotSequenceNumber::new(124),
+ wal_file_sequence_number: WalFileSequenceNumber::new(100),
+ catalog_sequence_number: CatalogSequenceNumber::new(100),
+ databases: dbs_2,
+ min_time: 0,
+ max_time: 1,
+ row_count: 0,
+ parquet_size_bytes: 0,
+ };
+
+ let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[
+ persisted_snapshot_1,
+ persisted_snapshot_2,
+ ]);
+ assert_eq!((2, 2, 4), overall_counts);
+ }
+
+ #[test]
+ fn test_overall_counts_zero() {
+ // db 1 setup
+ let db_id_1 = DbId::from(0);
+ let mut dbs_1 = SerdeVecMap::new();
+ let table_id_1 = TableId::from(0);
+ let mut tables_1 = SerdeVecMap::new();
+ let parquet_files_1 = vec![
+ ParquetFile {
+ id: ParquetFileId::from(1),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ParquetFile {
+ id: ParquetFileId::from(2),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ];
+ tables_1.insert(table_id_1, parquet_files_1);
+ dbs_1.insert(db_id_1, DatabaseTables { tables: tables_1 });
+
+ // db 2 setup
+ let db_id_2 = DbId::from(2);
+ let mut dbs_2 = SerdeVecMap::new();
+ let table_id_2 = TableId::from(2);
+ let mut tables_2 = SerdeVecMap::new();
+ let parquet_files_2 = vec![
+ ParquetFile {
+ id: ParquetFileId::from(4),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ParquetFile {
+ id: ParquetFileId::from(5),
+ path: "some_path".to_string(),
+ size_bytes: 100_000,
+ row_count: 200,
+ chunk_time: 1123456789,
+ min_time: 11234567777,
+ max_time: 11234567788,
+ },
+ ];
+ tables_2.insert(table_id_2, parquet_files_2);
+ dbs_2.insert(db_id_2, DatabaseTables { tables: tables_2 });
+
+ // add dbs_2 to snapshot
+ let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[]);
+ assert_eq!((0, 0, 0), overall_counts);
+ }
+}
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
index cb83ddb211..a7cb1eca14 100644
--- a/rust-toolchain.toml
+++ b/rust-toolchain.toml
@@ -1,3 +1,3 @@
[toolchain]
-channel = "1.83.0"
+channel = "1.84.0"
components = ["rustfmt", "clippy", "rust-analyzer"]