chore: `influxdb3_core` update (#24798)

chore: sync in latest core changes
pull/24800/head
Trevor Hilton 2024-03-21 10:29:56 -04:00 committed by GitHub
parent 84b85a9b1c
commit caae9ca9f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 364 additions and 334 deletions

511
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -47,8 +47,8 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91f3eb2e5430d23e2b551e66732bec1a3a575971" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91f3eb2e5430d23e2b551e66732bec1a3a575971" }
datafusion = { git = "https://github.com/erratic-pattern/arrow-datafusion.git", rev = "5965d670c88bdfa1fb74f32fd5021d400838dade" }
datafusion-proto = { git = "https://github.com/erratic-pattern/arrow-datafusion.git", rev = "5965d670c88bdfa1fb74f32fd5021d400838dade" }
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"
@ -60,7 +60,7 @@ hyper = "0.14"
libc = { version = "0.2" }
mockito = { version = "1.2.0", default-features = false }
num_cpus = "1.16.0"
object_store = "0.9.0"
object_store = "0.9.1"
once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12.1"
parquet = { version = "50.0.0", features = ["object_store"] }
@ -96,36 +96,37 @@ urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2", default-features = true, features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "fd238811b995ddf949a4c7546f4c59f25bd451cf", default-features = true, features = ["clap"] }
[workspace.lints.rust]
rust_2018_idioms = "deny"

View File

@ -16,7 +16,7 @@ use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig, ExecutorType};
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
@ -233,17 +233,20 @@ pub async fn command(config: Config) -> Result<()> {
info!(%num_threads, "Creating shared query executor");
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.bytes(),
}));
let runtime_env = exec.new_context(ExecutorType::Query).inner().runtime_env();
let exec = Arc::new(Executor::new_with_config(
"datafusion",
ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.bytes(),
},
));
let runtime_env = exec.new_context().inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
let trace_header_parser = TraceHeaderParser::new()

View File

@ -255,16 +255,19 @@ mod tests {
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let num_threads = NonZeroUsize::new(2).unwrap();
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let exec = Arc::new(Executor::new_with_config(
"datafusion",
ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
},
));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
@ -391,16 +394,19 @@ mod tests {
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let num_threads = NonZeroUsize::new(2).unwrap();
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let exec = Arc::new(Executor::new_with_config(
"datafusion",
ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
},
));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
@ -563,16 +569,19 @@ mod tests {
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let num_threads = NonZeroUsize::new(2).unwrap();
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let exec = Arc::new(Executor::new_with_config(
"datafusion",
ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
},
));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(
1708473607000000000,

View File

@ -21,7 +21,7 @@ use influxdb3_write::{
catalog::{Catalog, DatabaseSchema},
WriteBuffer,
};
use iox_query::exec::{Executor, ExecutorType, IOxSessionContext, QueryConfig};
use iox_query::exec::{Executor, IOxSessionContext, QueryConfig};
use iox_query::frontend::sql::SqlQueryPlanner;
use iox_query::provider::ProviderBuilder;
use iox_query::query_log::QueryCompletedToken;
@ -112,6 +112,8 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
external_span_ctx.as_ref().map(RequestLogContext::ctx),
"sql",
Box::new(q.to_string()),
// TODO - ignoring params for now:
StatementParams::default(),
);
info!("plan");
@ -129,7 +131,7 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
}
}
.map_err(Error::QueryPlanning)?;
let token = token.planned(Arc::clone(&plan));
let token = token.planned(&ctx, Arc::clone(&plan));
// TODO: Enforce concurrency limit here
let token = token.permit();
@ -341,6 +343,7 @@ impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
span_ctx: Option<&SpanContext>,
query_type: &'static str,
query_text: QueryText,
query_params: StatementParams,
) -> QueryCompletedToken<StateReceived> {
let trace_id = span_ctx.map(|ctx| ctx.trace_id);
let namespace_name: Arc<str> = Arc::from("influxdb3 edge");
@ -349,6 +352,7 @@ impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
namespace_name,
query_type,
query_text,
query_params,
trace_id,
)
}
@ -367,7 +371,7 @@ impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
let mut cfg = self
.exec
.new_execution_config(ExecutorType::Query)
.new_execution_config()
.with_default_catalog(Arc::new(qdb))
.with_span_context(span_ctx);
@ -415,8 +419,8 @@ impl<B: WriteBuffer> SchemaProvider for QueryDatabase<B> {
self.db_schema.table_names()
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.query_table(name).await.map(|qt| qt as _)
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.query_table(name).await.map(|qt| qt as _))
}
fn table_exist(&self, name: &str) -> bool {

View File

@ -1,6 +1,7 @@
use arrow::array::RecordBatch;
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::common::Statistics;
use iox_query::chunk_statistics::ChunkStatistics;
use iox_query::{QueryChunk, QueryChunkData};
use parquet_file::storage::ParquetExecInput;
use schema::sort::SortKey;
@ -12,7 +13,7 @@ use std::sync::Arc;
pub struct BufferChunk {
pub(crate) batches: Vec<RecordBatch>,
pub(crate) schema: Schema,
pub(crate) stats: Arc<Statistics>,
pub(crate) stats: Arc<ChunkStatistics>,
pub(crate) partition_id: data_types::partition::TransitionPartitionId,
pub(crate) sort_key: Option<SortKey>,
pub(crate) id: data_types::ChunkId,
@ -21,7 +22,7 @@ pub struct BufferChunk {
impl QueryChunk for BufferChunk {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
Arc::clone(&self.stats.statistics())
}
fn schema(&self) -> &Schema {
@ -64,7 +65,7 @@ impl QueryChunk for BufferChunk {
#[derive(Debug)]
pub struct ParquetChunk {
pub(crate) schema: Schema,
pub(crate) stats: Arc<Statistics>,
pub(crate) stats: Arc<ChunkStatistics>,
pub(crate) partition_id: TransitionPartitionId,
pub(crate) sort_key: Option<SortKey>,
pub(crate) id: ChunkId,
@ -74,7 +75,7 @@ pub struct ParquetChunk {
impl QueryChunk for ParquetChunk {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
Arc::clone(&self.stats.statistics())
}
fn schema(&self) -> &Schema {

View File

@ -250,6 +250,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
e_tag: None,
version: None,
},
object_store: self.persister.object_store(),
};
let parquet_chunk = ParquetChunk {