chore: sync to latest core (#25284)

pull/25304/head
Trevor Hilton 2024-09-06 10:49:38 -07:00 committed by GitHub
parent fb9d7d02f3
commit ad2ca83d72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 855 additions and 371 deletions

1052
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -35,12 +35,12 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
anyhow = "1.0"
arrow = { version = "52.1.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "52.1.0"
arrow-buffer = "52.1.0"
arrow-csv = "52.1.0"
arrow-flight = { version = "52.1.0", features = ["flight-sql-experimental"] }
arrow-json = "52.1.0"
arrow-schema = "52.1.0"
arrow-array = "52.2.0"
arrow-buffer = "52.2.0"
arrow-csv = "52.2.0"
arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] }
arrow-json = "52.2.0"
arrow-schema = "52.2.0"
assert_cmd = "2.0.14"
async-trait = "0.1"
backtrace = "0.3"
@ -51,8 +51,8 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "5de0c3577fd30dcf9213f428222a29efae789807" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "5de0c3577fd30dcf9213f428222a29efae789807" }
csv = "1.3.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
@ -71,7 +71,7 @@ mockito = { version = "1.4.0", default-features = false }
num_cpus = "1.16.0"
object_store = "0.10.2"
parking_lot = "0.12.1"
parquet = { version = "52.1.0", features = ["object_store"] }
parquet = { version = "52.2.0", features = ["object_store"] }
pbjson = "0.6.0"
pbjson-build = "0.6.2"
pbjson-types = "0.6.0"
@ -84,7 +84,9 @@ rand = "0.8.5"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream", "json"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# serde_json is set to 1.0.127 to prevent a conflict with core, if that gets updated upstream, this
# could be changed back to 1.0
serde_json = "1.0.127"
serde_urlencoded = "0.7.0"
serde_with = "3.8.1"
sha2 = "0.10.8"
@ -93,7 +95,7 @@ sqlparser = "0.48.0"
sysinfo = "0.30.8"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.35", features = ["full"] }
tokio = { version = "1.40", features = ["full"] }
tokio-util = "0.7.9"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
@ -106,39 +108,36 @@ urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
# Core.git crates we depend on
# Currently influxdb is pointed at a revision from the experimental branch
# in influxdb3_core, hiltontj/17-june-2024-iox-sync-exp, instead of main.
# See https://github.com/influxdata/influxdb3_core/pull/23
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "1a25527986e2fdfbf018c89d49112eb9b02bb87a", default-features = true, features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8", features = ["v3"] }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8", features = ["v3"] }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "1d5011bde4c343890bb58aa77415b20cb900a4a8", default-features = true, features = ["clap"] }
[workspace.lints.rust]
missing_copy_implementations = "deny"

View File

@ -19,10 +19,10 @@ async fn api_v3_write() {
.query(params)
.body(
"\
cpu region/us-east/host/a1 usage=42.0,temp=10 1234\n\
cpu region/us-east/host/b1 usage=10.5,temp=18 1234\n\
cpu region/us-west/host/a2 usage=88.0,temp=15 1234\n\
cpu region/us-west/host/b2 usage=92.2,temp=14 1234\n\
cpu,region/us-east/host/a1 usage=42.0,temp=10 1234\n\
cpu,region/us-east/host/b1 usage=10.5,temp=18 1234\n\
cpu,region/us-west/host/a2 usage=88.0,temp=15 1234\n\
cpu,region/us-west/host/b2 usage=92.2,temp=14 1234\n\
",
)
.send()
@ -71,25 +71,25 @@ async fn api_v3_write() {
},
// Series key out-of-order:
TestCase {
body: "cpu host/c1/region/ca-cent usage=22.0,temp=6 1236",
body: "cpu,host/c1/region/ca-cent usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [host, region]",
},
// Series key with invalid member at end:
TestCase {
body: "cpu region/ca-cent/host/c1/container/foo usage=22.0,temp=6 1236",
body: "cpu,region/ca-cent/host/c1/container/foo usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [region, host, container]",
},
// Series key with invalid member in middle:
TestCase {
body: "cpu region/ca-cent/sub-region/toronto/host/c1 usage=22.0,temp=6 1236",
body: "cpu,region/ca-cent/sub-region/toronto/host/c1 usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [region, sub-region, host]",
},
// Series key with invalid member at start:
TestCase {
body: "cpu planet/earth/region/ca-cent/host/c1 usage=22.0,temp=6 1236",
body: "cpu,planet/earth/region/ca-cent/host/c1 usage=22.0,temp=6 1236",
response_contains: "write to table cpu had the incorrect series key, \
expected: [region, host], received: [planet, region, host]",
},

View File

@ -7,13 +7,11 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::ArrowError;
use async_trait::async_trait;
use data_types::NamespaceId;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
use datafusion::common::arrow::array::StringArray;
use datafusion::common::arrow::datatypes::{DataType, Field, Schema as DatafusionSchema};
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::physical_plan::ExecutionPlan;
@ -30,7 +28,7 @@ use iox_query::query_log::QueryLog;
use iox_query::query_log::QueryText;
use iox_query::query_log::StateReceived;
use iox_query::query_log::{QueryCompletedToken, QueryLogEntries};
use iox_query::QueryDatabase;
use iox_query::{Extension, QueryDatabase};
use iox_query::{QueryChunk, QueryNamespace};
use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner;
use iox_query_params::StatementParams;
@ -124,10 +122,10 @@ impl QueryExecutor for QueryExecutorImpl {
let planner = SqlQueryPlanner::new();
(planner.query(query, params.clone(), &ctx).await, "sql")
}
QueryKind::InfluxQl => {
let planner = InfluxQLQueryPlanner::new();
(planner.query(query, params.clone(), &ctx).await, "influxql")
}
QueryKind::InfluxQl => (
InfluxQLQueryPlanner::query(query, params.clone(), &ctx).await,
"influxql",
),
};
let token = db.record_query(
external_span_ctx.as_ref().map(RequestLogContext::ctx),
@ -450,6 +448,15 @@ impl QueryNamespace for Database {
);
ctx
}
fn new_extended_query_context(
&self,
_extension: Arc<dyn Extension>,
_span_ctx: Option<SpanContext>,
_query_config: Option<&QueryConfig>,
) -> IOxSessionContext {
unimplemented!();
}
}
const LAST_CACHE_UDTF_NAME: &str = "last_cache";
@ -508,7 +515,7 @@ pub struct QueryTable {
impl QueryTable {
fn chunks(
&self,
ctx: &SessionState,
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
@ -546,7 +553,7 @@ impl TableProvider for QueryTable {
async fn scan(
&self,
ctx: &SessionState,
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,

View File

@ -1,8 +1,6 @@
use std::{any::Any, collections::HashMap, sync::Arc};
use datafusion::{
catalog::schema::SchemaProvider, datasource::TableProvider, error::DataFusionError,
};
use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError};
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;

View File

@ -139,13 +139,16 @@ fn from_query_log_entries(
));
columns.push(Arc::new(
entries.iter().map(|e| e.partitions).collect::<Int64Array>(),
entries
.iter()
.map(|e| e.partitions.map(|p| p as i64))
.collect::<Int64Array>(),
));
columns.push(Arc::new(
entries
.iter()
.map(|e| e.parquet_files)
.map(|e| e.parquet_files.map(|p| p as i64))
.collect::<Int64Array>(),
));

View File

@ -2385,7 +2385,7 @@ mod tests {
// Do one write to update the catalog with a db and table:
wbuf.write_lp_v3(
NamespaceName::new(db_name).unwrap(),
format!("{tbl_name} state/ca/county/napa/farm/10-01 speed=60").as_str(),
format!("{tbl_name},state/ca/county/napa/farm/10-01 speed=60").as_str(),
Time::from_timestamp_nanos(500),
false,
Precision::Nanosecond,
@ -2403,12 +2403,12 @@ mod tests {
NamespaceName::new(db_name).unwrap(),
format!(
"\
{tbl_name} state/ca/county/napa/farm/10-01 speed=50\n\
{tbl_name} state/ca/county/napa/farm/10-02 speed=49\n\
{tbl_name} state/ca/county/orange/farm/20-01 speed=40\n\
{tbl_name} state/ca/county/orange/farm/20-02 speed=33\n\
{tbl_name} state/ca/county/yolo/farm/30-01 speed=62\n\
{tbl_name} state/ca/county/nevada/farm/40-01 speed=66\n\
{tbl_name},state/ca/county/napa/farm/10-01 speed=50\n\
{tbl_name},state/ca/county/napa/farm/10-02 speed=49\n\
{tbl_name},state/ca/county/orange/farm/20-01 speed=40\n\
{tbl_name},state/ca/county/orange/farm/20-02 speed=33\n\
{tbl_name},state/ca/county/yolo/farm/30-01 speed=62\n\
{tbl_name},state/ca/county/nevada/farm/40-01 speed=66\n\
"
)
.as_str(),

View File

@ -3,9 +3,9 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::{
catalog::Session,
common::{plan_err, Result},
datasource::{function::TableFunctionImpl, TableProvider, TableType},
execution::context::SessionState,
logical_expr::{Expr, TableProviderFilterPushDown},
physical_plan::{memory::MemoryExec, ExecutionPlan},
scalar::ScalarValue,
@ -44,7 +44,7 @@ impl TableProvider for LastCacheFunctionProvider {
async fn scan(
&self,
ctx: &SessionState,
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,

View File

@ -13,8 +13,8 @@ pub mod write_buffer;
use async_trait::async_trait;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::catalog::Session;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::{self, SequenceNumber};
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
@ -89,7 +89,7 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {
table_name: &str,
filters: &[Expr],
projection: Option<&Vec<usize>>,
ctx: &SessionState,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError>;
}

View File

@ -18,9 +18,9 @@ use crate::{
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError};
use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::SendableRecordBatchStream;
use influxdb3_catalog::catalog::Catalog;
@ -280,7 +280,7 @@ impl WriteBufferImpl {
table_name: &str,
filters: &[Expr],
projection: Option<&Vec<usize>>,
ctx: &SessionState,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let db_schema = self
.catalog
@ -500,7 +500,7 @@ impl ChunkContainer for WriteBufferImpl {
table_name: &str,
filters: &[Expr],
projection: Option<&Vec<usize>>,
ctx: &SessionState,
ctx: &dyn Session,
) -> crate::Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
self.get_table_chunks(database_name, table_name, filters, projection, ctx)
}

View File

@ -10,8 +10,8 @@ use async_trait::async_trait;
use data_types::{
ChunkId, ChunkOrder, PartitionKey, TableId, TimestampMinMax, TransitionPartitionId,
};
use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion_util::stream_from_batches;
use hashbrown::HashMap;
@ -67,7 +67,7 @@ impl QueryableBuffer {
table_name: &str,
filters: &[Expr],
_projection: Option<&Vec<usize>>,
_ctx: &SessionState,
_ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let table = db_schema
.tables
@ -453,6 +453,7 @@ async fn sort_dedupe_persist(
let logical_plan = ReorgPlanner::new()
.compact_plan(
TableId::new(0),
persist_job.table_name,
&persist_job.schema,
chunks,