chore: update to latest influxdb3_core (#26429)

* chore: update to latest core

* chore: allow CDLA permissive 2 license

* chore: update insta snapshot for new internal df tables

* test: update assertion in flightsql test

* fix: object store size hinting workaround in clap_blocks

Applied a workaround from upstream to strip size hinting from the object
store get request options. See:

https://github.com/influxdata/influxdb_iox/issues/13771

* fix: query_executor tests use object store size hinting workaround

* fix: insta snapshot test for show system summary command

* chore: update windows- crates for advisories

* chore: update to latest sha on influxdb3_core branch

* chore: update to latest influxdb3_core rev

* refactor: pr feedback

* refactor: do not use object store size hint layer

Instead of using the ObjectStoreStripSizeHint layer, just provide the
configuration to datafusion to disable the use of size hinting from
iox_query.

This is used in IOx and not relevant to Monolith.

* fix: use parquet cache for get_opts requests

* test: that the parquet cache is being hit from write buffer
fix/docker-python-deps-earlier
Trevor Hilton 2025-05-26 14:11:06 -04:00 committed by GitHub
parent 9ed1af7d7a
commit 4dc61df77f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 1407 additions and 1188 deletions

2145
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -80,8 +80,8 @@ crossbeam-channel = "0.5.11"
csv = "1.3.0"
# Use DataFusion fork
# See https://github.com/influxdata/arrow-datafusion/pull/49 for contents
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ae0a57b05895ccf4d2febb9c91bbb0956cf7e863" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ae0a57b05895ccf4d2febb9c91bbb0956cf7e863" }
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "1c10b8b635831e87cb043a1e3fa8eb89be430d54" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "1c10b8b635831e87cb043a1e3fa8eb89be430d54" }
dashmap = "6.1.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
@ -137,7 +137,7 @@ sysinfo = "0.30.8"
tempfile = "3.14.0"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.43.1", features = ["full"] }
tokio = { version = "1.45", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["rt"] }
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
@ -148,41 +148,40 @@ twox-hash = "2.1.0"
unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
uuid = { version = "1", features = ["v4", "v7", "serde"] }
num = { version = "0.4.3" }
# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f"}
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
object_store_metrics = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f", features = ["v3"]}
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f", features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb", features = ["v3"]}
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
object_store_metrics = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb", features = ["v3"]}
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd0e474a6c0af5ba867399d753f5df18f59907cb", features = ["clap"] }
[workspace.lints.rust]
missing_copy_implementations = "deny"
@ -268,3 +267,4 @@ arrow-string = { git = "https://github.com/influxdata/arrow-rs.git", rev = "eae1
arrow-ord = { git = "https://github.com/influxdata/arrow-rs.git", rev = "eae176c21b1ef915227294e8a8a201b6f266031a" }
arrow-flight = { git = "https://github.com/influxdata/arrow-rs.git", rev = "eae176c21b1ef915227294e8a8a201b6f266031a" }
parquet = { git = "https://github.com/influxdata/arrow-rs.git", rev = "eae176c21b1ef915227294e8a8a201b6f266031a" }
object_store = { git = "https://github.com/influxdata/arrow-rs.git", rev = "c946cd81fa12e6588a3be33be08e3d8e9a2770e7" }

View File

@ -5,9 +5,6 @@
version = 2
yanked = "deny"
ignore = [
# dependent on datafusion-common moving away from instant
# https://github.com/apache/datafusion/pull/13355
"RUSTSEC-2024-0384",
# paste crate is no longer maintained, but it is past 1.0
# Keep this here until our transisent dependencies no longer
# need it
@ -24,6 +21,7 @@ allow = [
"BSD-2-Clause",
"BSD-3-Clause",
"CC0-1.0",
"CDLA-Permissive-2.0",
"ISC",
"MIT",
"Unicode-3.0",

View File

@ -49,7 +49,7 @@ use influxdb3_write::{
persisted_files::PersistedFiles,
},
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
use iox_time::SystemProvider;
use object_store::ObjectStore;
use object_store_metrics::ObjectStoreMetrics;
@ -561,6 +561,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&time_provider) as _,
"main",
&metrics,
config.object_store_config.bucket.as_ref(),
));
// setup cached object store:
@ -603,6 +604,9 @@ pub async fn command(config: Config) -> Result<()> {
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.as_num_bytes(),
// TODO: need to make these configurable?
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
},
DedicatedExecutor::new(
"datafusion",
@ -735,6 +739,7 @@ pub async fn command(config: Config) -> Result<()> {
sys_events_store: Arc::clone(&sys_events_store),
// convert to positive here so that we can avoid double negatives downstream
started_with_auth: !config.without_auth,
time_provider: Arc::clone(&time_provider) as _,
}));
let listener = TcpListener::bind(*config.http_bind_address)

View File

@ -1,22 +1,16 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
expression: summary_output
---
distinct_caches summary:
+-------+------+------------+--------------+-----------------+-----------------+
| table | name | column_ids | column_names | max_cardinality | max_age_seconds |
+-------+------+------------+--------------+-----------------+-----------------+
+-------+------+------------+--------------+-----------------+-----------------+
++
++
last_caches summary:
+-------+------+----------------+------------------+------------------+--------------------+-------+-----+
| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |
+-------+------+----------------+------------------+------------------+--------------------+-------+-----+
+-------+------+----------------+------------------+------------------+--------------------+-------+-----+
++
++
parquet_files summary:
+------------+------+------------+-----------+----------+----------+
| table_name | path | size_bytes | row_count | min_time | max_time |
+------------+------+------------+-----------+----------+----------+
+------------+------+------------+-----------+----------+----------+
++
++
processing_engine_logs summary:
++
++

View File

@ -117,6 +117,8 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
"+--------------+--------------------+----------------------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | parameters | VIEW |",
"| public | information_schema | routines | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",

View File

@ -276,7 +276,7 @@ impl TestServer {
//
// The file is deleted when it goes out of scope (the end of this method) by the TempDir type.
let tmp_dir = TempDir::new().unwrap();
let tmp_dir_path = tmp_dir.into_path();
let tmp_dir_path = tmp_dir.keep();
let tcp_addr_file = tmp_dir_path.join("tcp-listener");
let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command");
let command = command

View File

@ -23,4 +23,6 @@ expression: output
| public | information_schema | columns | VIEW |
| public | information_schema | df_settings | VIEW |
| public | information_schema | schemata | VIEW |
| public | information_schema | routines | VIEW |
| public | information_schema | parameters | VIEW |
+---------------+--------------------+----------------------------+------------+

View File

@ -1,7 +1,9 @@
use std::sync::Arc;
use async_trait::async_trait;
use authz::{Authorizer as IoxAuthorizer, Error as IoxError, Permission as IoxPermission};
use authz::{
Authorization, Authorizer as IoxAuthorizer, Error as IoxError, Permission as IoxPermission,
};
use influxdb3_id::{DbId, TokenId};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::{debug, trace};
@ -168,20 +170,16 @@ impl AuthProvider for TokenAuthenticator {
#[async_trait]
impl IoxAuthorizer for TokenAuthenticator {
async fn permissions(
async fn authorize(
&self,
token: Option<Vec<u8>>,
perms: &[IoxPermission],
) -> Result<Vec<IoxPermission>, IoxError> {
match self.authenticate(token).await {
Ok(_) => {
return Ok(perms.to_vec());
}
Err(err) => {
let iox_err = err.into();
return Err(iox_err);
}
}
) -> Result<Authorization, IoxError> {
let _token_id = self.authenticate(token).await?;
// (trevor) The "subject" may just use the `token_id` as a string, but we withheld doing so
// in order to decided and ensure that we make use of the subject consistently throughout
// the system. See: https://github.com/influxdata/influxdb/issues/26440
Ok(Authorization::new(None, perms.to_vec()))
}
}
@ -190,12 +188,12 @@ pub struct NoAuthAuthenticator;
#[async_trait]
impl IoxAuthorizer for NoAuthAuthenticator {
async fn permissions(
async fn authorize(
&self,
_token: Option<Vec<u8>>,
perms: &[IoxPermission],
) -> Result<Vec<IoxPermission>, IoxError> {
Ok(perms.to_vec())
) -> Result<Authorization, IoxError> {
Ok(Authorization::new(None, perms.to_vec()))
}
async fn probe(&self) -> Result<(), IoxError> {

View File

@ -3,9 +3,9 @@ use std::{any::Any, sync::Arc};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::{Session, TableProvider},
catalog::{Session, TableFunctionImpl, TableProvider},
common::{DFSchema, Result, internal_err, plan_err},
datasource::{TableType, function::TableFunctionImpl},
datasource::TableType,
execution::context::ExecutionProps,
logical_expr::TableProviderFilterPushDown,
physical_expr::{

View File

@ -3,9 +3,9 @@ use std::{any::Any, sync::Arc};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::{Session, TableProvider},
catalog::{Session, TableFunctionImpl, TableProvider},
common::{DFSchema, internal_err, plan_err},
datasource::{TableType, function::TableFunctionImpl},
datasource::TableType,
error::DataFusionError,
execution::context::ExecutionProps,
logical_expr::TableProviderFilterPushDown,

View File

@ -25,8 +25,8 @@ use iox_time::TimeProvider;
use metric::Registry;
use metrics::{AccessMetrics, SizeMetrics};
use object_store::{
Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, path::Path,
Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, path::Path,
};
use observability_deps::tracing::{debug, error, info, trace, warn};
use tokio::sync::{
@ -665,6 +665,32 @@ impl std::fmt::Display for MemCachedObjectStore {
}
}
/// Check that the given [`Range`] is valid with respect to a given `object_size`.
fn check_range(range: Range<usize>, object_size: usize) -> object_store::Result<Range<usize>> {
let Range { start, end } = range;
if end > object_size {
return Err(Error::Generic {
store: STORE_NAME,
source: format!("Range end ({end}) out of bounds, object size is {object_size}",)
.into(),
});
}
if start >= object_size {
return Err(Error::Generic {
store: STORE_NAME,
source: format!("Range start ({start}) out of bounds, object size is {object_size}",)
.into(),
});
}
if start > end {
return Err(Error::Generic {
store: STORE_NAME,
source: format!("Range end ({end}) is before range start ({start})",).into(),
});
}
Ok(range)
}
/// [`MemCachedObjectStore`] implements most [`ObjectStore`] methods as a pass-through, since
/// caching is decided externally. The exception is `delete`, which will have the entry removed
/// from the cache if the delete to the object store was successful.
@ -705,30 +731,38 @@ impl ObjectStore for MemCachedObjectStore {
/// Get an object from the object store. If this object is cached, then it will not make a request
/// to the inner object store.
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
if let Some(state) = self.cache.get(location) {
let v = state.value().await?;
Ok(GetResult {
payload: GetResultPayload::Stream(
futures::stream::iter([Ok(v.data.clone())]).boxed(),
),
meta: v.meta.clone(),
range: 0..v.data.len(),
attributes: Default::default(),
})
} else {
self.inner.get(location).await
}
self.get_opts(location, Default::default()).await
}
/// Get an object from the object store. If this object is cached, then it will not make a request
/// to the inner object store.
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
// NOTE(trevor): this could probably be supported through the cache if we need it via the
// ObjectMeta stored in the cache. For now this is conservative:
if let Some(state) = self.cache.get(location) {
let GetOptions { range, .. } = options;
let v = state.value().await?;
let bytes = range
.map(|r| match r {
GetRange::Bounded(range) => range,
GetRange::Offset(start) => start..v.data.len(),
GetRange::Suffix(end) => 0..end,
})
.map(|r| check_range(r, v.data.len()))
.transpose()?
.map_or_else(|| v.data.clone(), |r| v.data.slice(r));
Ok(GetResult {
payload: GetResultPayload::Stream(futures::stream::iter([Ok(bytes)]).boxed()),
meta: v.meta.clone(),
range: 0..v.data.len(),
attributes: Default::default(),
})
} else {
self.inner.get_opts(location, options).await
}
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
Ok(self
@ -751,28 +785,8 @@ impl ObjectStore for MemCachedObjectStore {
ranges
.iter()
.map(|range| {
if range.end > v.data.len() {
return Err(Error::Generic {
store: STORE_NAME,
source: format!(
"Range end ({}) out of bounds, object size is {}",
range.end,
v.data.len()
)
.into(),
});
}
if range.start > range.end {
return Err(Error::Generic {
store: STORE_NAME,
source: format!(
"Range end ({}) is before range start ({})",
range.end, range.start
)
.into(),
});
}
Ok(v.data.slice(range.clone()))
Ok(v.data
.slice(check_range(range.clone(), v.data.len())?.clone()))
})
.collect()
} else {

View File

@ -18,7 +18,6 @@ http.workspace = true
# object store crate uses the new version of the http crate
http_1 = { version = "1.1", package = "http" }
humantime.workspace = true
iox_catalog.workspace = true
iox_time.workspace = true
itertools.workspace = true
libc.workspace = true

View File

@ -85,6 +85,16 @@ impl IoxQueryDatafusionConfig {
),
self.use_cached_parquet_loader.to_string(),
);
// NB: need to prevent iox_query from injecting a size hint. It currently does so using a
// bit of a hack, and then strips it out with an additional object store layer. Instead of
// adding the additional layer, we just avoid using the size hint with this configuration.
self.datafusion_config.insert(
format!(
"{prefix}.hint_known_object_size_to_object_store",
prefix = IoxConfigExt::PREFIX
),
false.to_string(),
);
self.datafusion_config
}
}

View File

@ -815,7 +815,7 @@ macro_rules! object_store_config_inner {
return Ok(None);
};
let store = object_store::aws::AmazonS3Builder::from_env()
let store = Arc::new(object_store::aws::AmazonS3Builder::from_env()
// bucket name is ignored by our cache server
.with_bucket_name(self.bucket.as_deref().unwrap_or("placeholder"))
.with_client_options(
@ -833,9 +833,9 @@ macro_rules! object_store_config_inner {
})
.with_skip_signature(true)
.build()
.context(InvalidS3ConfigSnafu)?;
.context(InvalidS3ConfigSnafu)?);
Ok(Some(Arc::new(store)))
Ok(Some(store))
}
/// Build cache store.
@ -858,7 +858,7 @@ macro_rules! object_store_config_inner {
}
}
let remote_store: Arc<DynObjectStore> = match &self.object_store {
let object_store: Arc<DynObjectStore> = match &self.object_store {
None => return Err(ParseError::UnspecifiedObjectStore),
Some(ObjectStoreType::Memory) => {
info!(object_store_type = "Memory", "Object Store");
@ -891,7 +891,7 @@ macro_rules! object_store_config_inner {
Some(ObjectStoreType::File) => self.new_local_file_system()?,
};
Ok(remote_store)
Ok(object_store)
}
fn new_local_file_system(&self) -> Result<Arc<LocalFileSystemWithSortedListOp>, ParseError> {

View File

@ -169,7 +169,7 @@ macro_rules! tokio_rt_config {
TokioRuntimeType::MultiThreadAlt => {
#[cfg(tokio_unstable)]
{
tokio::runtime::Builder::new_multi_thread_alt()
tokio::runtime::Builder::new_multi_thread()
}
#[cfg(not(tokio_unstable))]
{

View File

@ -760,7 +760,9 @@ mod tests {
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
use influxdb3_write::{Precision, WriteBuffer};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, IOxSessionContext};
use iox_query::exec::{
DedicatedExecutor, Executor, ExecutorConfig, IOxSessionContext, PerQueryMemoryPoolConfig,
};
use iox_time::{MockProvider, Time, TimeProvider};
use metric::Registry;
use object_store::ObjectStore;
@ -1065,6 +1067,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
heap_memory_limit: None,
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
},
DedicatedExecutor::new_testing(),
))

View File

@ -15,7 +15,6 @@ data_types.workspace = true
datafusion_util.workspace = true
influxdb-line-protocol.workspace = true
influxdb_influxql_parser.workspace = true
iox_catalog.workspace = true
iox_http.workspace = true
iox_query.workspace = true
iox_query_params.workspace = true

View File

@ -1468,7 +1468,7 @@ fn token_part_as_bytes(token: &str) -> Result<Vec<u8>, AuthenticationError> {
impl From<authz::Error> for AuthenticationError {
fn from(auth_error: authz::Error) -> Self {
match auth_error {
authz::Error::Forbidden => Self::Forbidden,
authz::Error::Forbidden { .. } => Self::Forbidden,
_ => Self::Unauthenticated,
}
}

View File

@ -53,7 +53,8 @@ use trace_http::metrics::MetricFamily;
use trace_http::metrics::RequestMetrics;
use trace_http::tower::TraceLayer;
const TRACE_SERVER_NAME: &str = "influxdb3_http";
const TRACE_HTTP_SERVER_NAME: &str = "influxdb3_http";
const TRACE_GRPC_SERVER_NAME: &str = "influxdb3_grpc";
#[derive(Debug, Error)]
pub enum Error {
@ -150,23 +151,35 @@ pub async fn serve(
paths_without_authz: &'static Vec<&'static str>,
tcp_listener_file_path: Option<PathBuf>,
) -> Result<()> {
let req_metrics = RequestMetrics::new(
let grpc_metrics = RequestMetrics::new(
Arc::clone(&server.common_state.metrics),
MetricFamily::HttpServer,
MetricFamily::GrpcServer,
);
let trace_layer = TraceLayer::new(
let grpc_trace_layer = TraceLayer::new(
server.common_state.trace_header_parser.clone(),
Arc::new(req_metrics),
Arc::new(grpc_metrics),
server.common_state.trace_collector().clone(),
TRACE_SERVER_NAME,
TRACE_GRPC_SERVER_NAME,
trace_http::tower::ServiceProtocol::Grpc,
);
if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) {
let grpc_service = trace_layer.clone().layer(make_flight_server(
let grpc_service = grpc_trace_layer.layer(make_flight_server(
Arc::clone(&server.http.query_executor),
Some(server.authorizer()),
));
let http_metrics = RequestMetrics::new(
Arc::clone(&server.common_state.metrics),
MetricFamily::HttpServer,
);
let http_trace_layer = TraceLayer::new(
server.common_state.trace_header_parser.clone(),
Arc::new(http_metrics),
server.common_state.trace_collector().clone(),
TRACE_HTTP_SERVER_NAME,
trace_http::tower::ServiceProtocol::Http,
);
if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) {
let rest_service = hyper::service::make_service_fn(|_| {
let http_server = Arc::clone(&server.http);
let service = service_fn(move |req: hyper::Request<hyper::Body>| {
@ -177,7 +190,7 @@ pub async fn serve(
paths_without_authz,
)
});
let service = trace_layer.layer(service);
let service = http_trace_layer.layer(service);
futures::future::ready(Ok::<_, Infallible>(service))
});
@ -227,7 +240,7 @@ pub async fn serve(
.with_graceful_shutdown(shutdown.cancelled())
.await?;
} else {
let grpc_service = trace_layer.clone().layer(make_flight_server(
let grpc_service = grpc_trace_layer.layer(make_flight_server(
Arc::clone(&server.http.query_executor),
Some(server.authorizer()),
));
@ -242,7 +255,7 @@ pub async fn serve(
paths_without_authz,
)
});
let service = trace_layer.layer(service);
let service = http_trace_layer.layer(service);
futures::future::ready(Ok::<_, Infallible>(service))
});
@ -289,7 +302,7 @@ mod tests {
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::persisted_files::PersistedFiles;
use influxdb3_write::{Bufferer, WriteBuffer};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
use iox_time::{MockProvider, Time};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
@ -829,6 +842,8 @@ mod tests {
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
},
DedicatedExecutor::new_testing(),
));
@ -904,6 +919,7 @@ mod tests {
telemetry_store: Arc::clone(&sample_telem_store),
sys_events_store: Arc::clone(&sys_events_store),
started_with_auth: false,
time_provider: Arc::clone(&time_provider) as _,
}));
// bind to port 0 will assign a random available port:

View File

@ -35,6 +35,7 @@ use iox_query::query_log::StateReceived;
use iox_query::query_log::{QueryCompletedToken, QueryLogEntries};
use iox_query::{QueryChunk, QueryNamespace};
use iox_query_params::StatementParams;
use iox_time::TimeProvider;
use metric::Registry;
use observability_deps::tracing::{debug, info};
use std::any::Any;
@ -70,6 +71,7 @@ pub struct CreateQueryExecutorArgs {
pub write_buffer: Arc<dyn WriteBuffer>,
pub exec: Arc<Executor>,
pub metrics: Arc<Registry>,
pub time_provider: Arc<dyn TimeProvider>,
pub datafusion_config: Arc<HashMap<String, String>>,
pub query_log_size: usize,
pub telemetry_store: Arc<TelemetryStore>,
@ -89,6 +91,7 @@ impl QueryExecutorImpl {
telemetry_store,
sys_events_store,
started_with_auth,
time_provider,
}: CreateQueryExecutorArgs,
) -> Self {
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
@ -97,10 +100,7 @@ impl QueryExecutorImpl {
));
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(Semaphore::MAX_PERMITS));
let query_log = Arc::new(QueryLog::new(
query_log_size,
Arc::new(iox_time::SystemProvider::new()),
));
let query_log = Arc::new(QueryLog::new(query_log_size, time_provider, &metrics));
Self {
catalog,
write_buffer,
@ -271,6 +271,8 @@ async fn query_database_sql(
"sql",
Box::new(query.to_string()),
params.clone(),
// NB: do we need to provide an auth ID?
None,
);
// NOTE - we use the default query configuration on the IOxSessionContext here:
@ -325,6 +327,8 @@ async fn query_database_influxql(
"influxql",
Box::new(query_str.to_string()),
params.clone(),
// NB: do we need to provide an auth ID?
None,
);
let ctx = db.new_query_context(span_ctx, Default::default());
@ -556,6 +560,7 @@ impl QueryNamespace for Database {
query_type: &'static str,
query_text: QueryText,
query_params: StatementParams,
auth_id: Option<String>,
) -> QueryCompletedToken<StateReceived> {
let trace_id = span_ctx.map(|ctx| ctx.trace_id);
let namespace_name: Arc<str> = Arc::from("influxdb3 oss");
@ -565,6 +570,7 @@ impl QueryNamespace for Database {
query_type,
query_text,
query_params,
auth_id,
trace_id,
)
}
@ -745,7 +751,7 @@ impl TableProvider for QueryTable {
#[cfg(test)]
mod tests {
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Duration};
use crate::query_executor::QueryExecutorImpl;
use arrow::array::RecordBatch;
@ -767,7 +773,7 @@ mod tests {
persister::Persister,
write_buffer::{WriteBufferImpl, WriteBufferImplArgs, persisted_files::PersistedFiles},
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
use iox_time::{MockProvider, Time};
use metric::Registry;
use object_store::{ObjectStore, local::LocalFileSystem};
@ -793,6 +799,8 @@ mod tests {
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
},
DedicatedExecutor::new_testing(),
))
@ -875,7 +883,15 @@ mod tests {
)));
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
let metrics = Arc::new(Registry::new());
let datafusion_config = Arc::new(Default::default());
let mut datafusion_config = HashMap::new();
// NB: need to prevent iox_query from injecting a size hint. It currently does so using a
// bit of a hack, and then strips it out with an additional object store layer. Instead of
// adding the additional layer, we just avoid using the size hint with this configuration.
datafusion_config.insert(
"iox.hint_known_object_size_to_object_store".to_string(),
false.to_string(),
);
let datafusion_config = Arc::new(datafusion_config);
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
@ -886,6 +902,7 @@ mod tests {
telemetry_store,
sys_events_store: Arc::clone(&sys_events_store),
started_with_auth,
time_provider: Arc::clone(&time_provider) as _,
});
(

View File

@ -93,18 +93,24 @@ impl SchemaExec {
}
}
/// This function creates the cache object that stores the plan properties such as equivalence properties, partitioning, ordering, etc.
/// This function creates the cache object that stores the plan properties such as equivalence
/// properties, partitioning, ordering, etc.
fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
let eq_properties = match input.properties().output_ordering() {
None => EquivalenceProperties::new(schema),
Some(output_odering) => {
EquivalenceProperties::new_with_orderings(schema, &[output_odering.to_vec()])
EquivalenceProperties::new_with_orderings(schema, &[output_odering.clone()])
}
};
let output_partitioning = input.output_partitioning().clone();
PlanProperties::new(eq_properties, output_partitioning, input.execution_mode())
PlanProperties::new(
eq_properties,
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
)
}
}

View File

@ -276,6 +276,10 @@ impl ObjectStore for SynchronizedObjectStore {
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
if let Some((inbound, outbound)) = &self.get_notifies {
outbound.notify_one();
inbound.notified().await;
}
self.inner.get_opts(location, options).await
}

View File

@ -14,7 +14,6 @@ data_types.workspace = true
datafusion_util.workspace = true
executor.workspace = true
influxdb-line-protocol.workspace = true
iox_catalog.workspace = true
iox_http.workspace = true
iox_query.workspace = true
iox_time.workspace = true

View File

@ -676,7 +676,7 @@ mod tests {
use influxdb3_test_helpers::object_store::RequestCountedObjectStore;
use influxdb3_types::http::LastCacheSize;
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::exec::{Executor, ExecutorConfig, IOxSessionContext};
use iox_query::exec::{Executor, ExecutorConfig, IOxSessionContext, PerQueryMemoryPoolConfig};
use iox_time::{MockProvider, Time};
use metric::{Attributes, Metric, U64Counter};
use metrics::{
@ -3127,6 +3127,75 @@ mod tests {
}
}
#[test_log::test(tokio::test)]
async fn test_parquet_cache_hits() {
let object_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: influxdb3_wal::Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (wb, ctx, metrics) = setup_with_metrics_and_parquet_cache(
Time::from_timestamp_nanos(0),
object_store,
wal_config,
)
.await;
// Do enough writes to trigger a snapshot:
do_writes(
"foo",
wb.as_ref(),
&[
// first write has `tag`, but all subsequent writes do not
TestWrite {
lp: "bar,tag=a val=1",
time_seconds: 1,
},
TestWrite {
lp: "bar,tag=b val=2",
time_seconds: 2,
},
TestWrite {
lp: "bar,tag=c val=3",
time_seconds: 3,
},
],
)
.await;
verify_snapshot_count(1, &wb.persister).await;
let instrument = metrics
.get_instrument::<Metric<U64Counter>>("influxdb3_parquet_cache_access")
.unwrap();
let cached_count = instrument.observer(&[("status", "cached")]).fetch();
// We haven't queried anything so nothing has hit the cache yet:
assert_eq!(0, cached_count);
let batches = wb.get_record_batches_unchecked("foo", "bar", &ctx).await;
assert_batches_sorted_eq!(
[
"+-----+----------------------+-----+",
"| tag | time | val |",
"+-----+----------------------+-----+",
"| a | 1970-01-01T00:00:01Z | 1.0 |",
"| b | 1970-01-01T00:00:02Z | 2.0 |",
"| c | 1970-01-01T00:00:03Z | 3.0 |",
"+-----+----------------------+-----+",
],
&batches
);
let cached_count = instrument.observer(&[("status", "cached")]).fetch();
// NB: although there should only be a single file in the store, datafusion may make more
// than one request to fetch the file in chunks, which is why there is more than a single
// cache hit:
assert_eq!(3, cached_count);
}
struct TestWrite<LP> {
lp: LP,
time_seconds: i64,
@ -3234,6 +3303,16 @@ mod tests {
(buf, metrics)
}
async fn setup_with_metrics_and_parquet_cache(
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (Arc<WriteBufferImpl>, IOxSessionContext, Arc<Registry>) {
let (buf, ctx, _time_provider, metrics) =
setup_inner(start, object_store, wal_config, true).await;
(buf, ctx, metrics)
}
async fn setup_inner(
start: Time,
object_store: Arc<dyn ObjectStore>,
@ -3251,7 +3330,7 @@ mod tests {
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider),
Default::default(),
Arc::clone(&metric_registry),
);
(object_store, Some(parquet_cache))
} else {
@ -3398,6 +3477,8 @@ mod tests {
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
},
DedicatedExecutor::new_testing(),
))

View File

@ -600,7 +600,7 @@ mod tests {
use datafusion_util::config::register_iox_object_store;
use executor::{DedicatedExecutor, register_current_runtime_for_io};
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::exec::ExecutorConfig;
use iox_query::exec::{ExecutorConfig, PerQueryMemoryPoolConfig};
use iox_time::{MockProvider, Time, TimeProvider};
use object_store::ObjectStore;
use object_store::memory::InMemory;
@ -626,6 +626,8 @@ mod tests {
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
},
DedicatedExecutor::new_testing(),
));