chore: patch enterprise back to core (#25798)
parent
e421baf0bc
commit
0bdc2fa953
|
@ -2832,6 +2832,7 @@ dependencies = [
|
|||
name = "influxdb3_catalog"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrow",
|
||||
"bimap",
|
||||
"chrono",
|
||||
|
@ -3202,6 +3203,7 @@ dependencies = [
|
|||
"influxdb3_id",
|
||||
"influxdb3_internal_api",
|
||||
"influxdb3_py_api",
|
||||
"influxdb3_sys_events",
|
||||
"influxdb3_telemetry",
|
||||
"influxdb3_test_helpers",
|
||||
"influxdb3_wal",
|
||||
|
|
14
deny.toml
14
deny.toml
|
@ -1,7 +1,8 @@
|
|||
# Configuration documentation:
|
||||
# https://embarkstudios.github.io/cargo-deny/index.html
|
||||
# https://embarkstudios.github.io/cargo-deny/index.html
|
||||
|
||||
[advisories]
|
||||
version = 2
|
||||
yanked = "deny"
|
||||
ignore = [
|
||||
# dependent on datafusion-common moving away from instant
|
||||
|
@ -11,6 +12,8 @@ ignore = [
|
|||
git-fetch-with-cli = true
|
||||
|
||||
[licenses]
|
||||
version = 2
|
||||
unused-allowed-license = "warn"
|
||||
allow = [
|
||||
"Apache-2.0",
|
||||
"BSD-2-Clause",
|
||||
|
@ -22,7 +25,6 @@ allow = [
|
|||
"Unicode-3.0",
|
||||
"Zlib",
|
||||
]
|
||||
|
||||
exceptions = [
|
||||
# We should probably NOT bundle CA certs but use the OS ones.
|
||||
{ name = "webpki-roots", allow = ["MPL-2.0"] },
|
||||
|
@ -41,10 +43,10 @@ license-files = [
|
|||
github = ["influxdata"]
|
||||
|
||||
[bans]
|
||||
multiple-versions = "warn"
|
||||
multiple-versions = "allow"
|
||||
deny = [
|
||||
# We are using rustls as the TLS implementation, so we shouldn't be linking
|
||||
# in OpenSSL too.
|
||||
# We are using rustls as the TLS implementation, so we shouldn't be linking
|
||||
# in OpenSSL too.
|
||||
#
|
||||
# If you're hitting this, you might want to take a look at what new
|
||||
# dependencies you have introduced and check if there's a way to depend on
|
||||
|
@ -53,4 +55,6 @@ deny = [
|
|||
# We've decided to use the `humantime` crate to parse and generate friendly time formats; use
|
||||
# that rather than chrono-english.
|
||||
{ name = "chrono-english" },
|
||||
# Use stdlib ( https://doc.rust-lang.org/stable/std/io/trait.IsTerminal.html )
|
||||
{ name = "atty" },
|
||||
]
|
||||
|
|
|
@ -473,7 +473,7 @@ impl Node {
|
|||
let block = builder.append_block(value.0.as_bytes().into());
|
||||
for _ in 0..count {
|
||||
builder
|
||||
.try_append_view(block, 0u32, value.0.as_bytes().len() as u32)
|
||||
.try_append_view(block, 0u32, value.0.len() as u32)
|
||||
.expect("append view for known valid block, offset and length");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ influxdb3_id = { path = "../influxdb3_id" }
|
|||
influxdb3_wal = { path = "../influxdb3_wal" }
|
||||
|
||||
# crates.io dependencies
|
||||
anyhow.workspace = true
|
||||
arrow.workspace = true
|
||||
bimap.workspace = true
|
||||
chrono.workspace = true
|
||||
|
|
|
@ -4,7 +4,7 @@ use datafusion::config::ConfigExtension;
|
|||
use iox_query::config::IoxConfigExt;
|
||||
|
||||
/// Extends the standard [`HashMap`] based DataFusion config option in the CLI with specific
|
||||
/// options (along with defaults) for InfluxDB 3 OSS/Pro. This is intended for customization of
|
||||
/// options (along with defaults) for InfluxDB 3 Core/Enterprise. This is intended for customization of
|
||||
/// options that are defined in the `iox_query` crate, e.g., those defined in [`IoxConfigExt`]
|
||||
/// that are relevant to the monolithinc versions of InfluxDB 3.
|
||||
#[derive(Debug, clap::Parser, Clone)]
|
||||
|
|
|
@ -852,7 +852,7 @@ impl<'c> WriteRequestBuilder<'c, NoBody> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'c> WriteRequestBuilder<'c, Body> {
|
||||
impl WriteRequestBuilder<'_, Body> {
|
||||
/// Send the request to the server
|
||||
pub async fn send(self) -> Result<()> {
|
||||
let url = self.client.base_url.join("/api/v3/write_lp")?;
|
||||
|
@ -900,7 +900,7 @@ pub struct QueryRequestBuilder<'c> {
|
|||
// TODO - for now the send method just returns the bytes from the response.
|
||||
// It may be nicer to have the format parameter dictate how we return from
|
||||
// send, e.g., using types more specific to the format selected.
|
||||
impl<'c> QueryRequestBuilder<'c> {
|
||||
impl QueryRequestBuilder<'_> {
|
||||
/// Specify the format, `json`, `csv`, `pretty`, or `parquet`
|
||||
pub fn format(mut self, format: Format) -> Self {
|
||||
self.format = Some(format);
|
||||
|
@ -1101,7 +1101,7 @@ pub struct ShowDatabasesRequestBuilder<'c> {
|
|||
show_deleted: bool,
|
||||
}
|
||||
|
||||
impl<'c> ShowDatabasesRequestBuilder<'c> {
|
||||
impl ShowDatabasesRequestBuilder<'_> {
|
||||
/// Specify whether or not to show deleted databases in the output
|
||||
pub fn with_show_deleted(mut self, show_deleted: bool) -> Self {
|
||||
self.show_deleted = show_deleted;
|
||||
|
|
|
@ -24,6 +24,8 @@ pub enum QueryExecutorError {
|
|||
DatabasesToRecordBatch(#[source] ArrowError),
|
||||
#[error("unable to compose record batches from retention policies: {0}")]
|
||||
RetentionPoliciesToRecordBatch(#[source] ArrowError),
|
||||
#[error("invokded a method that is not implemented: {0}")]
|
||||
MethodNotImplemented(&'static str),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
|
@ -247,6 +247,10 @@ impl Error {
|
|||
fn into_response(self) -> Response<Body> {
|
||||
debug!(error = ?self, "API error");
|
||||
match self {
|
||||
Self::Query(err @ QueryExecutorError::MethodNotImplemented(_)) => Response::builder()
|
||||
.status(StatusCode::METHOD_NOT_ALLOWED)
|
||||
.body(Body::from(err.to_string()))
|
||||
.unwrap(),
|
||||
Self::WriteBuffer(err @ WriteBufferError::DatabaseNotFound { db_name: _ }) => {
|
||||
Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
|
@ -1254,7 +1258,7 @@ fn json_content_type(headers: &HeaderMap) -> bool {
|
|||
};
|
||||
|
||||
let is_json_content_type = mime.type_() == "application"
|
||||
&& (mime.subtype() == "json" || mime.suffix().map_or(false, |name| name == "json"));
|
||||
&& (mime.subtype() == "json" || mime.suffix().is_some_and(|name| name == "json"));
|
||||
|
||||
is_json_content_type
|
||||
}
|
||||
|
@ -1329,7 +1333,7 @@ fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameErr
|
|||
let mut rp_seperator_found = false;
|
||||
let mut last_char = None;
|
||||
for grapheme in name.graphemes(true) {
|
||||
if grapheme.as_bytes().len() > 1 {
|
||||
if grapheme.len() > 1 {
|
||||
// In the case of a unicode we need to handle multibyte chars
|
||||
return Err(ValidateDbNameError::InvalidChar);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
//! InfluxDB 3 Core server implementation
|
||||
//! InfluxDB 3 Enterprise server implementation
|
||||
//!
|
||||
//! The server is responsible for handling the HTTP API
|
||||
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
|
||||
|
@ -789,7 +789,7 @@ mod tests {
|
|||
let parquet_metrics_provider: Arc<PersistedFiles> =
|
||||
Arc::clone(&write_buffer_impl.persisted_files());
|
||||
let sample_telem_store =
|
||||
TelemetryStore::new_without_background_runners(parquet_metrics_provider);
|
||||
TelemetryStore::new_without_background_runners(Some(parquet_metrics_provider));
|
||||
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
|
||||
let common_state = crate::CommonServerState::new(
|
||||
Arc::clone(&metrics),
|
||||
|
|
|
@ -133,55 +133,16 @@ impl QueryExecutor for QueryExecutorImpl {
|
|||
db_name: database.to_string(),
|
||||
})?;
|
||||
|
||||
let params = params.unwrap_or_default();
|
||||
|
||||
let token = db.record_query(
|
||||
external_span_ctx.as_ref().map(RequestLogContext::ctx),
|
||||
kind.query_type(),
|
||||
Box::new(query.to_string()),
|
||||
params.clone(),
|
||||
);
|
||||
|
||||
// NOTE - we use the default query configuration on the IOxSessionContext here:
|
||||
let ctx = db.new_query_context(span_ctx, Default::default());
|
||||
let planner = Planner::new(&ctx);
|
||||
let query = query.to_string();
|
||||
|
||||
// Perform query planning on a separate threadpool than the IO runtime that is servicing
|
||||
// this request by using `IOxSessionContext::run`.
|
||||
let plan = ctx
|
||||
.run(async move {
|
||||
match kind {
|
||||
QueryKind::Sql => planner.sql(query, params).await,
|
||||
QueryKind::InfluxQl => planner.influxql(query, params).await,
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
|
||||
Ok(plan) => plan,
|
||||
Err(e) => {
|
||||
token.fail();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let token = token.planned(&ctx, Arc::clone(&plan));
|
||||
|
||||
// TODO: Enforce concurrency limit here
|
||||
let token = token.permit();
|
||||
|
||||
self.telemetry_store.update_num_queries();
|
||||
|
||||
match ctx.execute_stream(Arc::clone(&plan)).await {
|
||||
Ok(query_results) => {
|
||||
token.success();
|
||||
Ok(query_results)
|
||||
}
|
||||
Err(err) => {
|
||||
token.fail();
|
||||
Err(QueryExecutorError::ExecuteStream(err))
|
||||
}
|
||||
}
|
||||
query_database(
|
||||
db,
|
||||
query,
|
||||
params,
|
||||
kind,
|
||||
span_ctx,
|
||||
external_span_ctx,
|
||||
Arc::clone(&self.telemetry_store),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn show_databases(
|
||||
|
@ -271,6 +232,66 @@ impl QueryExecutor for QueryExecutorImpl {
|
|||
}
|
||||
}
|
||||
|
||||
async fn query_database(
|
||||
db: Arc<dyn QueryNamespace>,
|
||||
query: &str,
|
||||
params: Option<StatementParams>,
|
||||
kind: QueryKind,
|
||||
span_ctx: Option<SpanContext>,
|
||||
external_span_ctx: Option<RequestLogContext>,
|
||||
telemetry_store: Arc<TelemetryStore>,
|
||||
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
|
||||
let params = params.unwrap_or_default();
|
||||
|
||||
let token = db.record_query(
|
||||
external_span_ctx.as_ref().map(RequestLogContext::ctx),
|
||||
kind.query_type(),
|
||||
Box::new(query.to_string()),
|
||||
params.clone(),
|
||||
);
|
||||
|
||||
// NOTE - we use the default query configuration on the IOxSessionContext here:
|
||||
let ctx = db.new_query_context(span_ctx, Default::default());
|
||||
let planner = Planner::new(&ctx);
|
||||
let query = query.to_string();
|
||||
|
||||
// Perform query planning on a separate threadpool than the IO runtime that is servicing
|
||||
// this request by using `IOxSessionContext::run`.
|
||||
let plan = ctx
|
||||
.run(async move {
|
||||
match kind {
|
||||
QueryKind::Sql => planner.sql(query, params).await,
|
||||
QueryKind::InfluxQl => planner.influxql(query, params).await,
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
|
||||
Ok(plan) => plan,
|
||||
Err(e) => {
|
||||
token.fail();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let token = token.planned(&ctx, Arc::clone(&plan));
|
||||
|
||||
// TODO: Enforce concurrency limit here
|
||||
let token = token.permit();
|
||||
|
||||
telemetry_store.update_num_queries();
|
||||
|
||||
match ctx.execute_stream(Arc::clone(&plan)).await {
|
||||
Ok(query_results) => {
|
||||
token.success();
|
||||
Ok(query_results)
|
||||
}
|
||||
Err(err) => {
|
||||
token.fail();
|
||||
Err(QueryExecutorError::ExecuteStream(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RetentionPolicyRow {
|
||||
database: String,
|
||||
|
@ -351,21 +372,18 @@ impl QueryDatabase for QueryExecutorImpl {
|
|||
db_name: name.into(),
|
||||
}))
|
||||
})?;
|
||||
Ok(Some(Arc::new(Database::new(
|
||||
Ok(Some(Arc::new(Database::new(CreateDatabaseArgs {
|
||||
db_schema,
|
||||
Arc::clone(&self.write_buffer),
|
||||
Arc::clone(&self.exec),
|
||||
Arc::clone(&self.datafusion_config),
|
||||
Arc::clone(&self.query_log),
|
||||
Arc::clone(&self.sys_events_store),
|
||||
))))
|
||||
write_buffer: Arc::clone(&self.write_buffer),
|
||||
exec: Arc::clone(&self.exec),
|
||||
datafusion_config: Arc::clone(&self.datafusion_config),
|
||||
query_log: Arc::clone(&self.query_log),
|
||||
sys_events_store: Arc::clone(&self.sys_events_store),
|
||||
}))))
|
||||
}
|
||||
|
||||
async fn acquire_semaphore(&self, span: Option<Span>) -> InstrumentedAsyncOwnedSemaphorePermit {
|
||||
Arc::clone(&self.query_execution_semaphore)
|
||||
.acquire_owned(span)
|
||||
.await
|
||||
.expect("Semaphore should not be closed by anyone")
|
||||
acquire_semaphore(Arc::clone(&self.query_execution_semaphore), span).await
|
||||
}
|
||||
|
||||
fn query_log(&self) -> QueryLogEntries {
|
||||
|
@ -373,6 +391,27 @@ impl QueryDatabase for QueryExecutorImpl {
|
|||
}
|
||||
}
|
||||
|
||||
async fn acquire_semaphore(
|
||||
semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||
span: Option<Span>,
|
||||
) -> InstrumentedAsyncOwnedSemaphorePermit {
|
||||
semaphore
|
||||
.acquire_owned(span)
|
||||
.await
|
||||
.expect("Semaphore should not be closed by anyone")
|
||||
}
|
||||
|
||||
/// Arguments for [`Database::new`]
|
||||
#[derive(Debug)]
|
||||
pub struct CreateDatabaseArgs {
|
||||
db_schema: Arc<DatabaseSchema>,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
exec: Arc<Executor>,
|
||||
datafusion_config: Arc<HashMap<String, String>>,
|
||||
query_log: Arc<QueryLog>,
|
||||
sys_events_store: Arc<SysEventStore>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Database {
|
||||
db_schema: Arc<DatabaseSchema>,
|
||||
|
@ -385,12 +424,14 @@ pub struct Database {
|
|||
|
||||
impl Database {
|
||||
pub fn new(
|
||||
db_schema: Arc<DatabaseSchema>,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
exec: Arc<Executor>,
|
||||
datafusion_config: Arc<HashMap<String, String>>,
|
||||
query_log: Arc<QueryLog>,
|
||||
sys_events_store: Arc<SysEventStore>,
|
||||
CreateDatabaseArgs {
|
||||
db_schema,
|
||||
write_buffer,
|
||||
exec,
|
||||
datafusion_config,
|
||||
query_log,
|
||||
sys_events_store,
|
||||
}: CreateDatabaseArgs,
|
||||
) -> Self {
|
||||
let system_schema_provider = Arc::new(SystemSchemaProvider::AllSystemSchemaTables(
|
||||
AllSystemSchemaTablesProvider::new(
|
||||
|
@ -626,6 +667,7 @@ impl TableProvider for QueryTable {
|
|||
provider.scan(ctx, projection, &filters, limit).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
|
||||
|
@ -679,7 +721,12 @@ mod tests {
|
|||
))
|
||||
}
|
||||
|
||||
async fn setup() -> (Arc<dyn WriteBuffer>, QueryExecutorImpl, Arc<MockProvider>) {
|
||||
pub(crate) async fn setup() -> (
|
||||
Arc<dyn WriteBuffer>,
|
||||
QueryExecutorImpl,
|
||||
Arc<MockProvider>,
|
||||
Arc<SysEventStore>,
|
||||
) {
|
||||
// Set up QueryExecutor
|
||||
let object_store: Arc<dyn ObjectStore> =
|
||||
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
|
||||
|
@ -718,7 +765,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
|
||||
let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files);
|
||||
let telemetry_store = TelemetryStore::new_without_background_runners(Some(persisted_files));
|
||||
let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
|
||||
&time_provider,
|
||||
)));
|
||||
|
@ -733,15 +780,20 @@ mod tests {
|
|||
datafusion_config,
|
||||
query_log_size: 10,
|
||||
telemetry_store,
|
||||
sys_events_store,
|
||||
sys_events_store: Arc::clone(&sys_events_store),
|
||||
});
|
||||
|
||||
(write_buffer, query_executor, time_provider)
|
||||
(
|
||||
write_buffer,
|
||||
query_executor,
|
||||
time_provider,
|
||||
sys_events_store,
|
||||
)
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn system_parquet_files_success() {
|
||||
let (write_buffer, query_executor, time_provider) = setup().await;
|
||||
let (write_buffer, query_executor, time_provider, _) = setup().await;
|
||||
// Perform some writes to multiple tables
|
||||
let db_name = "test_db";
|
||||
// perform writes over time to generate WAL files and some snapshots
|
||||
|
|
|
@ -125,7 +125,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_sample_all_metrics() {
|
||||
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
|
||||
let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));
|
||||
let store =
|
||||
TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
|
||||
|
||||
mock_sys_info_provider
|
||||
.expect_get_pid()
|
||||
|
@ -145,7 +146,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_sample_all_metrics_with_call_failure() {
|
||||
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
|
||||
let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));
|
||||
let store =
|
||||
TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
|
||||
|
||||
mock_sys_info_provider
|
||||
.expect_get_pid()
|
||||
|
|
|
@ -71,7 +71,9 @@ impl TelemetryStore {
|
|||
store
|
||||
}
|
||||
|
||||
pub fn new_without_background_runners(persisted_files: Arc<dyn ParquetMetrics>) -> Arc<Self> {
|
||||
pub fn new_without_background_runners(
|
||||
persisted_files: Option<Arc<dyn ParquetMetrics>>,
|
||||
) -> Arc<Self> {
|
||||
let instance_id = Arc::from("sample-instance-id");
|
||||
let os = Arc::from("Linux");
|
||||
let influx_version = Arc::from("influxdb3-0.1.0");
|
||||
|
@ -80,7 +82,7 @@ impl TelemetryStore {
|
|||
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
|
||||
Arc::new(TelemetryStore {
|
||||
inner: parking_lot::Mutex::new(inner),
|
||||
persisted_files: Some(persisted_files),
|
||||
persisted_files,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -191,7 +193,7 @@ impl TelemetryStoreInner {
|
|||
instance_id: self.instance_id.clone(),
|
||||
storage_type: self.storage_type.clone(),
|
||||
cores: self.cores,
|
||||
product_type: "OSS",
|
||||
product_type: "Core",
|
||||
uptime_secs: self.start_timer.elapsed().as_secs(),
|
||||
|
||||
cpu_utilization_percent_min_1m: self.cpu.utilization.min,
|
||||
|
@ -316,7 +318,7 @@ mod tests {
|
|||
let store: Arc<TelemetryStore> = TelemetryStore::new(
|
||||
Arc::from("some-instance-id"),
|
||||
Arc::from("Linux"),
|
||||
Arc::from("OSS-v3.0"),
|
||||
Arc::from("Core-v3.0"),
|
||||
Arc::from("Memory"),
|
||||
10,
|
||||
Some(parqet_file_metrics),
|
||||
|
|
|
@ -43,6 +43,19 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
|
|||
WalOp::Write(write_batch)
|
||||
}
|
||||
|
||||
pub fn catalog_batch_op(
|
||||
db_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
time_ns: i64,
|
||||
ops: impl IntoIterator<Item = CatalogOp>,
|
||||
sequence_number: u32,
|
||||
) -> WalOp {
|
||||
WalOp::Catalog(OrderedCatalogBatch::new(
|
||||
catalog_batch(db_id, db_name, time_ns, ops),
|
||||
sequence_number,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn catalog_batch(
|
||||
db_id: DbId,
|
||||
db_name: impl Into<Arc<str>>,
|
||||
|
|
|
@ -16,7 +16,6 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
|
|||
use influxdb_line_protocol::v3::SeriesValue;
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use iox_time::Time;
|
||||
use observability_deps::tracing::error;
|
||||
use schema::{InfluxColumnType, InfluxFieldType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
|
@ -320,6 +319,10 @@ impl OrderedCatalogBatch {
|
|||
pub fn batch(&self) -> &CatalogBatch {
|
||||
&self.catalog
|
||||
}
|
||||
|
||||
pub fn into_batch(self) -> CatalogBatch {
|
||||
self.catalog
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
|
|
|
@ -252,6 +252,7 @@ impl WalObjectStore {
|
|||
.await
|
||||
};
|
||||
info!(
|
||||
host = self.host_identifier_prefix,
|
||||
n_ops = %wal_contents.ops.len(),
|
||||
min_timestamp_ns = %wal_contents.min_timestamp_ns,
|
||||
max_timestamp_ns = %wal_contents.max_timestamp_ns,
|
||||
|
|
|
@ -32,6 +32,7 @@ influxdb3_internal_api = { path = "../influxdb3_internal_api" }
|
|||
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
|
||||
influxdb3_wal = { path = "../influxdb3_wal" }
|
||||
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
|
||||
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
|
||||
influxdb3_py_api = {path = "../influxdb3_py_api"}
|
||||
|
||||
# crates.io dependencies
|
||||
|
|
|
@ -64,13 +64,13 @@ impl QueryChunk for BufferChunk {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct ParquetChunk {
|
||||
pub(crate) schema: Schema,
|
||||
pub(crate) stats: Arc<ChunkStatistics>,
|
||||
pub(crate) partition_id: TransitionPartitionId,
|
||||
pub(crate) sort_key: Option<SortKey>,
|
||||
pub(crate) id: ChunkId,
|
||||
pub(crate) chunk_order: ChunkOrder,
|
||||
pub(crate) parquet_exec: ParquetExecInput,
|
||||
pub schema: Schema,
|
||||
pub stats: Arc<ChunkStatistics>,
|
||||
pub partition_id: TransitionPartitionId,
|
||||
pub sort_key: Option<SortKey>,
|
||||
pub id: ChunkId,
|
||||
pub chunk_order: ChunkOrder,
|
||||
pub parquet_exec: ParquetExecInput,
|
||||
}
|
||||
|
||||
impl QueryChunk for ParquetChunk {
|
||||
|
|
|
@ -260,6 +260,32 @@ impl PersistedSnapshot {
|
|||
.or_default()
|
||||
.push(parquet_file);
|
||||
}
|
||||
|
||||
pub fn db_table_and_file_count(&self) -> (u64, u64, u64) {
|
||||
let mut db_count = 0;
|
||||
let mut table_count = 0;
|
||||
let mut file_count = 0;
|
||||
for (_, db_tables) in &self.databases {
|
||||
db_count += 1;
|
||||
table_count += db_tables.tables.len() as u64;
|
||||
file_count += db_tables.tables.values().fold(0, |mut acc, files| {
|
||||
acc += files.len() as u64;
|
||||
acc
|
||||
});
|
||||
}
|
||||
(db_count, table_count, file_count)
|
||||
}
|
||||
|
||||
pub fn overall_db_table_file_counts(host_snapshots: &[PersistedSnapshot]) -> (u64, u64, u64) {
|
||||
let overall_counts = host_snapshots.iter().fold((0, 0, 0), |mut acc, item| {
|
||||
let (db_count, table_count, file_count) = item.db_table_and_file_count();
|
||||
acc.0 += db_count;
|
||||
acc.1 += table_count;
|
||||
acc.2 += file_count;
|
||||
acc
|
||||
});
|
||||
overall_counts
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
|
||||
|
@ -274,8 +300,11 @@ pub struct ParquetFile {
|
|||
pub path: String,
|
||||
pub size_bytes: u64,
|
||||
pub row_count: u64,
|
||||
/// chunk time nanos
|
||||
pub chunk_time: i64,
|
||||
/// min time nanos
|
||||
pub min_time: i64,
|
||||
/// max time nanos
|
||||
pub max_time: i64,
|
||||
}
|
||||
|
||||
|
@ -429,3 +458,175 @@ pub(crate) mod test_help {
|
|||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use influxdb3_catalog::catalog::CatalogSequenceNumber;
|
||||
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
|
||||
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
|
||||
use crate::{DatabaseTables, ParquetFile, PersistedSnapshot};
|
||||
|
||||
#[test]
|
||||
fn test_overall_counts() {
|
||||
let host = "host_id";
|
||||
// db 1 setup
|
||||
let db_id_1 = DbId::from(0);
|
||||
let mut dbs_1 = SerdeVecMap::new();
|
||||
let table_id_1 = TableId::from(0);
|
||||
let mut tables_1 = SerdeVecMap::new();
|
||||
let parquet_files_1 = vec![
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(1),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(2),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
];
|
||||
tables_1.insert(table_id_1, parquet_files_1);
|
||||
dbs_1.insert(db_id_1, DatabaseTables { tables: tables_1 });
|
||||
|
||||
// add dbs_1 to snapshot
|
||||
let persisted_snapshot_1 = PersistedSnapshot {
|
||||
host_id: host.to_string(),
|
||||
next_file_id: ParquetFileId::from(0),
|
||||
next_db_id: DbId::from(1),
|
||||
next_table_id: TableId::from(1),
|
||||
next_column_id: ColumnId::from(1),
|
||||
snapshot_sequence_number: SnapshotSequenceNumber::new(124),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(100),
|
||||
catalog_sequence_number: CatalogSequenceNumber::new(100),
|
||||
databases: dbs_1,
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
parquet_size_bytes: 0,
|
||||
};
|
||||
|
||||
// db 2 setup
|
||||
let db_id_2 = DbId::from(2);
|
||||
let mut dbs_2 = SerdeVecMap::new();
|
||||
let table_id_2 = TableId::from(2);
|
||||
let mut tables_2 = SerdeVecMap::new();
|
||||
let parquet_files_2 = vec![
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(4),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(5),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
];
|
||||
tables_2.insert(table_id_2, parquet_files_2);
|
||||
dbs_2.insert(db_id_2, DatabaseTables { tables: tables_2 });
|
||||
|
||||
// add dbs_2 to snapshot
|
||||
let persisted_snapshot_2 = PersistedSnapshot {
|
||||
host_id: host.to_string(),
|
||||
next_file_id: ParquetFileId::from(5),
|
||||
next_db_id: DbId::from(2),
|
||||
next_table_id: TableId::from(22),
|
||||
next_column_id: ColumnId::from(22),
|
||||
snapshot_sequence_number: SnapshotSequenceNumber::new(124),
|
||||
wal_file_sequence_number: WalFileSequenceNumber::new(100),
|
||||
catalog_sequence_number: CatalogSequenceNumber::new(100),
|
||||
databases: dbs_2,
|
||||
min_time: 0,
|
||||
max_time: 1,
|
||||
row_count: 0,
|
||||
parquet_size_bytes: 0,
|
||||
};
|
||||
|
||||
let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[
|
||||
persisted_snapshot_1,
|
||||
persisted_snapshot_2,
|
||||
]);
|
||||
assert_eq!((2, 2, 4), overall_counts);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_overall_counts_zero() {
|
||||
// db 1 setup
|
||||
let db_id_1 = DbId::from(0);
|
||||
let mut dbs_1 = SerdeVecMap::new();
|
||||
let table_id_1 = TableId::from(0);
|
||||
let mut tables_1 = SerdeVecMap::new();
|
||||
let parquet_files_1 = vec![
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(1),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(2),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
];
|
||||
tables_1.insert(table_id_1, parquet_files_1);
|
||||
dbs_1.insert(db_id_1, DatabaseTables { tables: tables_1 });
|
||||
|
||||
// db 2 setup
|
||||
let db_id_2 = DbId::from(2);
|
||||
let mut dbs_2 = SerdeVecMap::new();
|
||||
let table_id_2 = TableId::from(2);
|
||||
let mut tables_2 = SerdeVecMap::new();
|
||||
let parquet_files_2 = vec![
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(4),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
ParquetFile {
|
||||
id: ParquetFileId::from(5),
|
||||
path: "some_path".to_string(),
|
||||
size_bytes: 100_000,
|
||||
row_count: 200,
|
||||
chunk_time: 1123456789,
|
||||
min_time: 11234567777,
|
||||
max_time: 11234567788,
|
||||
},
|
||||
];
|
||||
tables_2.insert(table_id_2, parquet_files_2);
|
||||
dbs_2.insert(db_id_2, DatabaseTables { tables: tables_2 });
|
||||
|
||||
// add dbs_2 to snapshot
|
||||
let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[]);
|
||||
assert_eq!((0, 0, 0), overall_counts);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[toolchain]
|
||||
channel = "1.83.0"
|
||||
channel = "1.84.0"
|
||||
components = ["rustfmt", "clippy", "rust-analyzer"]
|
||||
|
|
Loading…
Reference in New Issue