refactor: move to `ColumnId` and `Arc<str>` as much as possible (#25495)

Closes #25461 

_Note: the first three commits on this PR are from https://github.com/influxdata/influxdb/pull/25492_

This PR makes the switch from using names for columns to the use of `ColumnId`s. Where column names are used, they are represented as `Arc<str>`. This impacts most components of the system, and the result is a fairly sizeable change set. The area where the most refactoring was needed was in the last-n-value cache.

One of the themes of this PR is to rely less on the arrow `Schema` for handling the column-level information, and tracking that info in our own `ColumnDefinition` type, which captures the `ColumnId`.

I will summarize the various changes in the PR below, and also leave some comments in-line in the PR.

## Switch to `u32` for `ColumnId`

The `ColumnId` now follows the `DbId` and `TableId`, and uses a globally unique `u32` to identify all columns in the database. This was a change from using a `u16` that was only unique within the column's table. This makes it easier to follow the patterns used for creating the other identifier types when dealing with columns, and should reduce the burden of having to manage the state of a table-scoped identifier.

## Changes in the WAL/Catalog

* `WriteBatch` now contains no names for tables or columns and purely uses IDs
* This PR relies on `IndexMap` for `_Id`-keyed maps so that the order of elements in the map is consistent. This has important implications, namely, that when iterating over an ID map, the elements therein will always be produced in the same order which allows us to make assertions on column order in a lot of our tests, and allows for the re-introduction of `insta` snapshots for serialization tests. This map type provides O(1) lookups, but also provides _fast_ iteration, which should help when serializing these maps in write batches to the WAL.
* Removed the need to serialize the bi-directional maps for `DatabaseSchema`/`TableDefinition` via use of `SerdeVecMap` (see comments in-line)  
* The `tables` map in `DatabaseSchema` no stores an `Arc<TableDefinition>` so that the table definition can be shared around more easily. This meant that changes to tables in the catalog need to do a clone, but we were already having to do a clone for changes to the DB schema.
* Removal of the `TableSchema` type and consolidation of its parts/functions directly onto `TableDefinition`
* Added the `ColumnDefinition` type, which represents all we need to know about a column, and is used in place of the Arrow `Schema` for column-level meta-info. We were previously relying heavily on the `Schema` for iterating over columns, accessing data types, etc., but this gives us an API that we have more control over for our needs. The `Schema` is still held at the `TableDefinition` level, as it is needed for the query path, and is maintained to be consistent with what is contained in the `ColumnDefinition`s for a table.

## Changes in the Last-N-Value Cache

* There is a bigger distinction between caches that have an explicit set of value columns, and those that accept new fields. The former should be more performant.
* The Arrow `Schema` is managed differently now: it used to be updated more than it needed to be, and now is only updated when a row with new fields is pushed to a cache that accepts new fields.

## Changes in the write-path

* When ingesting, during validation, field names are qualified to their associated column ID
praveen/fix-telemetry-url-panic
Trevor Hilton 2024-11-01 16:42:57 -04:00 committed by GitHub
parent 0e814f5d52
commit d26a73802a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 2627 additions and 1716 deletions

5
Cargo.lock generated
View File

@ -2645,6 +2645,8 @@ version = "0.1.0"
dependencies = [
"arrow",
"bimap",
"hashbrown 0.14.5",
"indexmap 2.6.0",
"influxdb-line-protocol",
"influxdb3_id",
"influxdb3_wal",
@ -2680,7 +2682,7 @@ dependencies = [
name = "influxdb3_id"
version = "0.1.0"
dependencies = [
"hashbrown 0.14.5",
"indexmap 2.6.0",
"serde",
"serde_json",
]
@ -2842,6 +2844,7 @@ dependencies = [
"data_types",
"futures-util",
"hashbrown 0.14.5",
"indexmap 2.6.0",
"influxdb-line-protocol",
"influxdb3_id",
"iox_time",

View File

@ -26,6 +26,7 @@ async fn api_v3_configure_last_cache_create() {
#[derive(Default)]
struct TestCase {
// These attributes all map to parameters of the request body:
description: &'static str,
db: Option<&'static str>,
table: Option<&'static str>,
cache_name: Option<&'static str>,
@ -38,58 +39,58 @@ async fn api_v3_configure_last_cache_create() {
}
let test_cases = [
// No parameters specified:
TestCase {
description: "no parameters specified",
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Missing database name:
TestCase {
description: "missing database name",
table: Some(tbl_name),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Missing table name:
TestCase {
description: "missing table name",
db: Some(db_name),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Good, will use defaults for everything omitted, and get back a 201:
TestCase {
description: "Good, will use defaults for everything omitted, and get back a 201",
db: Some(db_name),
table: Some(tbl_name),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as before, will be successful, but with 204:
TestCase {
description: "Same as before, will be successful, but with 204",
db: Some(db_name),
table: Some(tbl_name),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Use a specific cache name, will succeed and create new cache:
// NOTE: this will only differ from the previous cache in name, should this actually
// be an error?
TestCase {
description: "Use a specific cache name, will succeed and create new cache",
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as previous, but will get 204 because it does nothing:
TestCase {
description: "Same as previous, but will get 204 because it does nothing",
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Same as previous, but this time try to use different parameters, this will result in
// a bad request:
TestCase {
description: "Same as previous, but this time try to use different parameters, this \
will result in a bad request",
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
@ -98,32 +99,33 @@ async fn api_v3_configure_last_cache_create() {
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Will create new cache, because key columns are unique, and so will be the name:
TestCase {
description:
"Will create new cache, because key columns are unique, and so will be the name",
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["t1", "t2"]),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as previous, but will get 204 because nothing happens:
TestCase {
description: "Same as previous, but will get 204 because nothing happens",
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["t1", "t2"]),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Use an invalid key column (by name) is a bad request:
TestCase {
description: "Use an invalid key column (by name) is a bad request",
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["not_a_key_column"]),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid key column (by type) is a bad request:
TestCase {
description: "Use an invalid key column (by type) is a bad request",
db: Some(db_name),
table: Some(tbl_name),
// f5 is a float, which is not supported as a key column:
@ -131,16 +133,16 @@ async fn api_v3_configure_last_cache_create() {
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid value column is a bad request:
TestCase {
description: "Use an invalid value column is a bad request",
db: Some(db_name),
table: Some(tbl_name),
val_cols: Some(&["not_a_value_column"]),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid cache size is a bad request:
TestCase {
description: "Use an invalid cache size is a bad request",
db: Some(db_name),
table: Some(tbl_name),
count: Some(11),
@ -166,7 +168,12 @@ async fn api_v3_configure_last_cache_create() {
.await
.expect("send /api/v3/configure/last_cache request");
let status = resp.status();
assert_eq!(t.expected, status, "test case ({i}) failed");
assert_eq!(
t.expected,
status,
"test case ({i}) failed, {description}",
description = t.description
);
}
}

View File

@ -185,12 +185,12 @@ async fn last_caches_table() {
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+---------------------+----------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+---------------------+----------------+---------------+-------+-------+",
"| cpu | cpu_host_last_cache | [host] | | 1 | 14400 |",
"| mem | mem_last_cache | [host, region] | [time, usage] | 1 | 60 |",
"+-------+---------------------+----------------+---------------+-------+-------+",
"+-------+---------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+---------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_host_last_cache | [1] | | 1 | 14400 |",
"| mem | mem_last_cache | [6, 5] | [7, 8] | 1 | 60 |",
"+-------+---------------------+-------------+---------------+-------+-------+",
],
&batches
);
@ -204,11 +204,11 @@ async fn last_caches_table() {
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | | 5 | 14400 |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | | 5 | 14400 |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
],
&batches
);
@ -237,11 +237,11 @@ async fn last_caches_table() {
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+----------------+----------------+---------------+-------+-----+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+----------------+----------------+---------------+-------+-----+",
"| mem | mem_last_cache | [host, region] | [time, usage] | 1 | 60 |",
"+-------+----------------+----------------+---------------+-------+-----+",
"+-------+----------------+-------------+---------------+-------+-----+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+----------------+-------------+---------------+-------+-----+",
"| mem | mem_last_cache | [6, 5] | [7, 8] | 1 | 60 |",
"+-------+----------------+-------------+---------------+-------+-----+",
],
&batches
);
@ -268,11 +268,11 @@ async fn last_caches_table() {
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | | 5 | 14400 |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | | 5 | 14400 |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
],
&batches
);

View File

@ -18,6 +18,8 @@ influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
arrow.workspace = true
bimap.workspace = true
hashbrown.workspace = true
indexmap.workspace = true
parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +1,79 @@
use crate::catalog::ColumnDefinition;
use crate::catalog::DatabaseSchema;
use crate::catalog::TableDefinition;
use crate::catalog::TableSchema;
use arrow::datatypes::DataType as ArrowDataType;
use bimap::BiHashMap;
use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use schema::{InfluxColumnType, SchemaBuilder};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
impl Serialize for DatabaseSchema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let snapshot = DatabaseSnapshot::from(self);
snapshot.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for DatabaseSchema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
DatabaseSnapshot::deserialize(deserializer).map(Into::into)
}
}
#[derive(Debug, Serialize, Deserialize)]
struct DatabaseSnapshot {
id: DbId,
name: Arc<str>,
tables: SerdeVecMap<TableId, TableSnapshot>,
}
impl From<&DatabaseSchema> for DatabaseSnapshot {
fn from(db: &DatabaseSchema) -> Self {
Self {
id: db.id,
name: Arc::clone(&db.name),
tables: db
.tables
.iter()
.map(|(table_id, table_def)| (*table_id, table_def.as_ref().into()))
.collect(),
}
}
}
impl From<DatabaseSnapshot> for DatabaseSchema {
fn from(snap: DatabaseSnapshot) -> Self {
let mut table_map = BiHashMap::with_capacity(snap.tables.len());
let tables = snap
.tables
.into_iter()
.map(|(id, table)| {
table_map.insert(id, Arc::clone(&table.table_name));
(id, Arc::new(table.into()))
})
.collect();
Self {
id: snap.id,
name: snap.name,
tables,
table_map,
}
}
}
impl Serialize for TableDefinition {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@ -25,7 +89,7 @@ impl<'de> Deserialize<'de> for TableDefinition {
where
D: serde::Deserializer<'de>,
{
TableSnapshot::<'de>::deserialize(deserializer).map(Into::into)
TableSnapshot::deserialize(deserializer).map(Into::into)
}
}
@ -35,59 +99,25 @@ impl<'de> Deserialize<'de> for TableDefinition {
/// This is used over serde's `Serialize`/`Deserialize` implementations on the inner `Schema` type
/// due to them being considered unstable. This type intends to mimic the structure of the Arrow
/// `Schema`, and will help guard against potential breaking changes to the Arrow Schema types.
#[serde_with::serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct TableSnapshot<'a> {
struct TableSnapshot {
table_id: TableId,
table_name: &'a str,
table_name: Arc<str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
key: Option<Vec<&'a str>>,
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
cols: BTreeMap<&'a str, ColumnDefinition<'a>>,
key: Option<Vec<ColumnId>>,
cols: SerdeVecMap<ColumnId, ColumnDefinitionSnapshot>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
last_caches: Vec<LastCacheSnapshot<'a>>,
#[serde_as(as = "ColumnMapAsArray")]
column_map: BiHashMap<ColumnId, Arc<str>>,
next_column_id: ColumnId,
}
serde_with::serde_conv!(
ColumnMapAsArray,
BiHashMap<ColumnId, Arc<str>>,
|map: &BiHashMap<ColumnId, Arc<str>>| {
let mut vec = map.iter().fold(Vec::new(), |mut acc, (id, name)| {
acc.push(ColumnMap {
column_id: *id,
name: Arc::clone(&name)
});
acc
});
vec.sort_by_key(|col| col.column_id);
vec
},
|vec: Vec<ColumnMap>| -> Result<_, std::convert::Infallible> {
Ok(vec.into_iter().fold(BiHashMap::new(), |mut acc, column| {
acc.insert(column.column_id, column.name);
acc
}))
}
);
#[derive(Debug, Serialize, Deserialize)]
struct ColumnMap {
column_id: ColumnId,
name: Arc<str>,
last_caches: Vec<LastCacheSnapshot>,
}
/// Representation of Arrow's `DataType` for table snapshots.
///
/// Uses `#[non_exhaustive]` with the assumption that variants will be added as we support
/// more Arrow data types.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
enum DataType<'a> {
enum DataType {
Null,
Bool,
I8,
@ -107,12 +137,12 @@ enum DataType<'a> {
Bin,
BigBin,
BinView,
Dict(Box<DataType<'a>>, Box<DataType<'a>>),
Time(TimeUnit, Option<&'a str>),
Dict(Box<DataType>, Box<DataType>),
Time(TimeUnit, Option<Arc<str>>),
}
/// Representation of Arrow's `TimeUnit` for table snapshots.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
enum TimeUnit {
#[serde(rename = "s")]
Second,
@ -154,53 +184,80 @@ impl From<InfluxColumnType> for InfluxType {
}
}
impl From<InfluxColumnType> for DataType {
fn from(value: InfluxColumnType) -> Self {
match value {
InfluxColumnType::Tag => Self::Dict(Box::new(Self::I32), Box::new(Self::Str)),
InfluxColumnType::Field(field) => match field {
InfluxFieldType::Float => Self::F64,
InfluxFieldType::Integer => Self::I64,
InfluxFieldType::UInteger => Self::U64,
InfluxFieldType::String => Self::Str,
InfluxFieldType::Boolean => Self::Bool,
},
InfluxColumnType::Timestamp => Self::Time(TimeUnit::Nanosecond, TIME_DATA_TIMEZONE()),
}
}
}
/// The inner column definition for a [`TableSnapshot`]
#[derive(Debug, Serialize, Deserialize)]
struct ColumnDefinition<'a> {
struct ColumnDefinitionSnapshot {
name: Arc<str>,
/// The id of the column
column_id: ColumnId,
id: ColumnId,
/// The column's data type
#[serde(borrow)]
r#type: DataType<'a>,
r#type: DataType,
/// The columns Influx type
influx_type: InfluxType,
/// Whether the column can hold NULL values
nullable: bool,
}
impl<'a> From<&'a TableDefinition> for TableSnapshot<'a> {
fn from(def: &'a TableDefinition) -> Self {
let cols = def
.schema()
.iter()
.map(|(col_type, f)| {
(
f.name().as_str(),
ColumnDefinition {
column_id: def.schema.name_to_id_unchecked(f.name().as_str().into()),
r#type: f.data_type().into(),
influx_type: col_type.into(),
nullable: f.is_nullable(),
},
)
})
.collect();
let keys = def.schema().series_key();
let last_caches = def.last_caches.values().map(Into::into).collect();
impl From<ColumnDefinitionSnapshot> for ColumnDefinition {
fn from(snap: ColumnDefinitionSnapshot) -> Self {
Self {
table_id: def.table_id,
table_name: def.table_name.as_ref(),
cols,
key: keys,
last_caches,
next_column_id: def.schema.next_column_id(),
column_map: def.schema.column_map().clone(),
id: snap.id,
name: Arc::clone(&snap.name),
data_type: match snap.influx_type {
InfluxType::Tag => InfluxColumnType::Tag,
InfluxType::Field => InfluxColumnType::Field(InfluxFieldType::from(&snap.r#type)),
InfluxType::Time => InfluxColumnType::Timestamp,
},
nullable: snap.nullable,
}
}
}
impl<'a> From<&'a ArrowDataType> for DataType<'a> {
fn from(arrow_type: &'a ArrowDataType) -> Self {
impl From<&TableDefinition> for TableSnapshot {
fn from(def: &TableDefinition) -> Self {
Self {
table_id: def.table_id,
table_name: Arc::clone(&def.table_name),
key: def.series_key.clone(),
cols: def
.columns
.iter()
.map(|(col_id, col_def)| {
(
*col_id,
ColumnDefinitionSnapshot {
name: Arc::clone(&col_def.name),
id: *col_id,
r#type: col_def.data_type.into(),
influx_type: col_def.data_type.into(),
nullable: col_def.nullable,
},
)
})
.collect(),
last_caches: def.last_caches.values().map(Into::into).collect(),
}
}
}
impl From<&ArrowDataType> for DataType {
fn from(arrow_type: &ArrowDataType) -> Self {
match arrow_type {
ArrowDataType::Null => Self::Null,
ArrowDataType::Boolean => Self::Bool,
@ -215,7 +272,7 @@ impl<'a> From<&'a ArrowDataType> for DataType<'a> {
ArrowDataType::Float16 => Self::F16,
ArrowDataType::Float32 => Self::F32,
ArrowDataType::Float64 => Self::F64,
ArrowDataType::Timestamp(unit, tz) => Self::Time((*unit).into(), tz.as_deref()),
ArrowDataType::Timestamp(unit, tz) => Self::Time((*unit).into(), tz.clone()),
ArrowDataType::Date32 => unimplemented!(),
ArrowDataType::Date64 => unimplemented!(),
ArrowDataType::Time32(_) => unimplemented!(),
@ -248,45 +305,38 @@ impl<'a> From<&'a ArrowDataType> for DataType<'a> {
}
}
impl<'a> From<TableSnapshot<'a>> for TableDefinition {
fn from(snap: TableSnapshot<'a>) -> Self {
let table_name = snap.table_name.into();
impl From<TableSnapshot> for TableDefinition {
fn from(snap: TableSnapshot) -> Self {
let table_id = snap.table_id;
let mut b = SchemaBuilder::new();
b.measurement(snap.table_name.to_string());
if let Some(keys) = snap.key {
b.with_series_key(keys);
}
for (name, col) in snap.cols {
match col.influx_type {
InfluxType::Tag => {
b.influx_column(name, schema::InfluxColumnType::Tag);
}
InfluxType::Field => {
b.influx_field(name, col.r#type.into());
}
InfluxType::Time => {
b.timestamp();
}
}
}
let schema = TableSchema::new_with_mapping(
b.build().expect("valid schema from snapshot"),
snap.column_map,
snap.next_column_id,
);
let last_caches = snap
.last_caches
.into_iter()
.map(|lc_snap| (lc_snap.name.to_string(), lc_snap.into()))
.collect();
Self {
table_name,
let table_def = Self::new(
table_id,
schema,
last_caches,
snap.table_name,
snap.cols
.into_iter()
.map(|(id, def)| {
(
id,
def.name,
match def.influx_type {
InfluxType::Tag => InfluxColumnType::Tag,
InfluxType::Field => {
InfluxColumnType::Field(InfluxFieldType::from(def.r#type))
}
InfluxType::Time => InfluxColumnType::Timestamp,
},
)
})
.collect(),
snap.key,
)
.expect("serialized catalog should be valid");
Self {
last_caches: snap
.last_caches
.into_iter()
.map(|lc_snap| (Arc::clone(&lc_snap.name), lc_snap.into()))
.collect(),
..table_def
}
}
}
@ -297,8 +347,21 @@ impl<'a> From<TableSnapshot<'a>> for TableDefinition {
// has been defined to mimic the Arrow type.
//
// See <https://github.com/influxdata/influxdb_iox/issues/11111>
impl<'a> From<DataType<'a>> for schema::InfluxFieldType {
fn from(data_type: DataType<'a>) -> Self {
impl From<DataType> for InfluxFieldType {
fn from(data_type: DataType) -> Self {
match data_type {
DataType::Bool => Self::Boolean,
DataType::I64 => Self::Integer,
DataType::U64 => Self::UInteger,
DataType::F64 => Self::Float,
DataType::Str => Self::String,
other => unimplemented!("unsupported data type in catalog {other:?}"),
}
}
}
impl From<&DataType> for InfluxFieldType {
fn from(data_type: &DataType) -> Self {
match data_type {
DataType::Bool => Self::Boolean,
DataType::I64 => Self::Integer,
@ -311,27 +374,25 @@ impl<'a> From<DataType<'a>> for schema::InfluxFieldType {
}
#[derive(Debug, Serialize, Deserialize)]
struct LastCacheSnapshot<'a> {
struct LastCacheSnapshot {
table_id: TableId,
table: &'a str,
name: &'a str,
keys: Vec<&'a str>,
vals: Option<Vec<&'a str>>,
table: Arc<str>,
name: Arc<str>,
keys: Vec<ColumnId>,
vals: Option<Vec<ColumnId>>,
n: usize,
ttl: u64,
}
impl<'a> From<&'a LastCacheDefinition> for LastCacheSnapshot<'a> {
fn from(lcd: &'a LastCacheDefinition) -> Self {
impl From<&LastCacheDefinition> for LastCacheSnapshot {
fn from(lcd: &LastCacheDefinition) -> Self {
Self {
table_id: lcd.table_id,
table: &lcd.table,
name: &lcd.name,
keys: lcd.key_columns.iter().map(|v| v.as_str()).collect(),
table: Arc::clone(&lcd.table),
name: Arc::clone(&lcd.name),
keys: lcd.key_columns.to_vec(),
vals: match &lcd.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
Some(columns.iter().map(|v| v.as_str()).collect())
}
LastCacheValueColumnsDef::Explicit { columns } => Some(columns.to_vec()),
LastCacheValueColumnsDef::AllNonKeyColumns => None,
},
n: lcd.count.into(),
@ -340,17 +401,15 @@ impl<'a> From<&'a LastCacheDefinition> for LastCacheSnapshot<'a> {
}
}
impl<'a> From<LastCacheSnapshot<'a>> for LastCacheDefinition {
fn from(snap: LastCacheSnapshot<'a>) -> Self {
impl From<LastCacheSnapshot> for LastCacheDefinition {
fn from(snap: LastCacheSnapshot) -> Self {
Self {
table_id: snap.table_id,
table: snap.table.to_string(),
name: snap.name.to_string(),
key_columns: snap.keys.iter().map(|s| s.to_string()).collect(),
table: snap.table,
name: snap.name,
key_columns: snap.keys,
value_columns: match snap.vals {
Some(cols) => LastCacheValueColumnsDef::Explicit {
columns: cols.iter().map(|s| s.to_string()).collect(),
},
Some(columns) => LastCacheValueColumnsDef::Explicit { columns },
None => LastCacheValueColumnsDef::AllNonKeyColumns,
},
count: snap

View File

@ -0,0 +1,260 @@
---
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
---
{
"databases": [
[
0,
{
"id": 0,
"name": "test_db",
"tables": [
[
1,
{
"table_id": 1,
"table_name": "test_table_1",
"cols": [
[
5,
{
"name": "bool_field",
"id": 5,
"type": "bool",
"influx_type": "field",
"nullable": true
}
],
[
8,
{
"name": "f64_field",
"id": 8,
"type": "f64",
"influx_type": "field",
"nullable": true
}
],
[
6,
{
"name": "i64_field",
"id": 6,
"type": "i64",
"influx_type": "field",
"nullable": true
}
],
[
4,
{
"name": "string_field",
"id": 4,
"type": "str",
"influx_type": "field",
"nullable": true
}
],
[
0,
{
"name": "tag_1",
"id": 0,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
1,
{
"name": "tag_2",
"id": 1,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
2,
{
"name": "tag_3",
"id": 2,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
3,
{
"name": "time",
"id": 3,
"type": {
"time": [
"ns",
null
]
},
"influx_type": "time",
"nullable": false
}
],
[
7,
{
"name": "u64_field",
"id": 7,
"type": "u64",
"influx_type": "field",
"nullable": true
}
]
]
}
],
[
2,
{
"table_id": 2,
"table_name": "test_table_2",
"cols": [
[
14,
{
"name": "bool_field",
"id": 14,
"type": "bool",
"influx_type": "field",
"nullable": true
}
],
[
17,
{
"name": "f64_field",
"id": 17,
"type": "f64",
"influx_type": "field",
"nullable": true
}
],
[
15,
{
"name": "i64_field",
"id": 15,
"type": "i64",
"influx_type": "field",
"nullable": true
}
],
[
13,
{
"name": "string_field",
"id": 13,
"type": "str",
"influx_type": "field",
"nullable": true
}
],
[
9,
{
"name": "tag_1",
"id": 9,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
10,
{
"name": "tag_2",
"id": 10,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
11,
{
"name": "tag_3",
"id": 11,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
12,
{
"name": "time",
"id": 12,
"type": {
"time": [
"ns",
null
]
},
"influx_type": "time",
"nullable": false
}
],
[
16,
{
"name": "u64_field",
"id": 16,
"type": "u64",
"influx_type": "field",
"nullable": true
}
]
]
}
]
]
}
]
],
"sequence": 0,
"host_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}

View File

@ -0,0 +1,117 @@
---
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
---
{
"databases": [
[
0,
{
"id": 0,
"name": "test_db",
"tables": [
[
0,
{
"table_id": 0,
"table_name": "test",
"cols": [
[
4,
{
"name": "field",
"id": 4,
"type": "str",
"influx_type": "field",
"nullable": true
}
],
[
0,
{
"name": "tag_1",
"id": 0,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
1,
{
"name": "tag_2",
"id": 1,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
2,
{
"name": "tag_3",
"id": 2,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": true
}
],
[
3,
{
"name": "time",
"id": 3,
"type": {
"time": [
"ns",
null
]
},
"influx_type": "time",
"nullable": false
}
]
],
"last_caches": [
{
"table_id": 0,
"table": "test",
"name": "test_table_last_cache",
"keys": [
1,
2
],
"vals": [
4
],
"n": 1,
"ttl": 600
}
]
}
]
]
}
]
],
"sequence": 0,
"host_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}

View File

@ -0,0 +1,106 @@
---
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
---
{
"databases": [
[
0,
{
"id": 0,
"name": "test_db",
"tables": [
[
1,
{
"table_id": 1,
"table_name": "test_table_1",
"key": [
0,
1,
2
],
"cols": [
[
4,
{
"name": "field",
"id": 4,
"type": "str",
"influx_type": "field",
"nullable": true
}
],
[
0,
{
"name": "tag_1",
"id": 0,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": false
}
],
[
1,
{
"name": "tag_2",
"id": 1,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": false
}
],
[
2,
{
"name": "tag_3",
"id": 2,
"type": {
"dict": [
"i32",
"str"
]
},
"influx_type": "tag",
"nullable": false
}
],
[
3,
{
"name": "time",
"id": 3,
"type": {
"time": [
"ns",
null
]
},
"influx_type": "time",
"nullable": false
}
]
]
}
]
]
}
]
],
"sequence": 0,
"host_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}

View File

@ -711,7 +711,7 @@ pub struct LastCacheCreatedResponse {
/// Given name of the cache
pub name: String,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
pub key_columns: Vec<u32>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
@ -726,7 +726,7 @@ pub struct LastCacheCreatedResponse {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
Explicit { columns: Vec<u32> },
/// Stores all non-key columns
AllNonKeyColumns,
}
@ -999,10 +999,10 @@ mod tests {
r#"{
"table": "table",
"name": "cache_name",
"key_columns": ["col1", "col2"],
"key_columns": [0, 1],
"value_columns": {
"type": "explicit",
"columns": ["col3", "col4"]
"columns": [2, 3]
},
"ttl": 120,
"count": 5

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
hashbrown.workspace = true
indexmap.workspace = true
serde.workspace = true
[dev-dependencies]

View File

@ -6,7 +6,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
mod serialize;
pub use serialize::SerdeVecHashMap;
pub use serialize::SerdeVecMap;
#[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)]
pub struct DbId(u32);
@ -90,23 +90,29 @@ impl Display for TableId {
}
#[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)]
pub struct ColumnId(u16);
pub struct ColumnId(u32);
static NEXT_COLUMN_ID: AtomicU32 = AtomicU32::new(0);
impl ColumnId {
pub fn new(id: u16) -> Self {
Self(id)
pub fn new() -> Self {
Self(NEXT_COLUMN_ID.fetch_add(1, Ordering::SeqCst))
}
pub fn next_id(&self) -> Self {
Self(self.0 + 1)
pub fn next_id() -> Self {
Self(NEXT_COLUMN_ID.load(Ordering::SeqCst))
}
pub fn as_u16(&self) -> u16 {
pub fn set_next_id(&self) {
NEXT_COLUMN_ID.store(self.0, Ordering::SeqCst)
}
pub fn as_u32(&self) -> u32 {
self.0
}
}
impl From<u16> for ColumnId {
fn from(value: u16) -> Self {
impl From<u32> for ColumnId {
fn from(value: u32) -> Self {
Self(value)
}
}
@ -117,6 +123,12 @@ impl Display for ColumnId {
}
}
impl Default for ColumnId {
fn default() -> Self {
Self::new()
}
}
/// The next file id to be used when persisting `ParquetFile`s
pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0);

View File

@ -3,9 +3,9 @@ use std::{
ops::{Deref, DerefMut},
};
use hashbrown::{
hash_map::{IntoIter, Iter, IterMut},
HashMap,
use indexmap::{
map::{IntoIter, Iter, IterMut},
IndexMap,
};
use serde::{
de::{self, SeqAccess, Visitor},
@ -13,28 +13,38 @@ use serde::{
Deserialize, Deserializer, Serialize, Serializer,
};
/// A new-type around a `HashMap` that provides special serialization and deserialization behaviour.
/// A new-type around a [`IndexMap`] that provides special serialization and deserialization behaviour.
///
/// Specifically, it will be serialized as a vector of tuples, each tuple containing a key-value
/// pair from the map. Deserialization assumes said serialization, and deserializes from the vector
/// of tuples back into the map. Traits like `Deref`, `From`, etc. are implemented on this type such
/// that it can be used as a `HashMap`.
/// that it can be used as a `IndexMap`.
///
/// During deserialization, there are no duplicate keys allowed. If duplicates are found, an error
/// will be thrown.
///
/// The `IndexMap` type is used to preserve insertion, and thereby iteration order. This ensures
/// consistent ordering of entities when this map is iterated over, for e.g., column ordering in
/// queries, or entity ordering during serialization. Since `IndexMap` stores key/value pairs in a
/// contiguous vector, iterating over its members is faster than a `HashMap`. This is beneficial for
/// WAL serialization.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SerdeVecHashMap<K: Eq + std::hash::Hash, V>(HashMap<K, V>);
pub struct SerdeVecMap<K: Eq + std::hash::Hash, V>(IndexMap<K, V>);
impl<K, V> SerdeVecHashMap<K, V>
impl<K, V> SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(size: usize) -> Self {
Self(IndexMap::with_capacity(size))
}
}
impl<K, V> Default for SerdeVecHashMap<K, V>
impl<K, V> Default for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
@ -43,17 +53,17 @@ where
}
}
impl<K, V, T> From<T> for SerdeVecHashMap<K, V>
impl<K, V, T> From<T> for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
T: Into<HashMap<K, V>>,
T: Into<IndexMap<K, V>>,
{
fn from(value: T) -> Self {
Self(value.into())
}
}
impl<K, V> IntoIterator for SerdeVecHashMap<K, V>
impl<K, V> IntoIterator for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
@ -66,7 +76,7 @@ where
}
}
impl<'a, K, V> IntoIterator for &'a SerdeVecHashMap<K, V>
impl<'a, K, V> IntoIterator for &'a SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
@ -79,7 +89,7 @@ where
}
}
impl<'a, K, V> IntoIterator for &'a mut SerdeVecHashMap<K, V>
impl<'a, K, V> IntoIterator for &'a mut SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
@ -92,18 +102,27 @@ where
}
}
impl<K, V> Deref for SerdeVecHashMap<K, V>
impl<K, V> FromIterator<(K, V)> for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
type Target = HashMap<K, V>;
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}
impl<K, V> Deref for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
type Target = IndexMap<K, V>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<K, V> DerefMut for SerdeVecHashMap<K, V>
impl<K, V> DerefMut for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash,
{
@ -112,7 +131,7 @@ where
}
}
impl<K, V> Serialize for SerdeVecHashMap<K, V>
impl<K, V> Serialize for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash + Serialize,
V: Serialize,
@ -129,7 +148,7 @@ where
}
}
impl<'de, K, V> Deserialize<'de> for SerdeVecHashMap<K, V>
impl<'de, K, V> Deserialize<'de> for SerdeVecMap<K, V>
where
K: Eq + std::hash::Hash + Deserialize<'de>,
V: Deserialize<'de>,
@ -139,7 +158,7 @@ where
D: Deserializer<'de>,
{
let v = deserializer.deserialize_seq(VecVisitor::new())?;
let mut map = HashMap::with_capacity(v.len());
let mut map = IndexMap::with_capacity(v.len());
for (k, v) in v.into_iter() {
if map.insert(k, v).is_some() {
return Err(de::Error::custom("duplicate key found"));
@ -188,20 +207,18 @@ where
#[cfg(test)]
mod tests {
use hashbrown::HashMap;
use indexmap::IndexMap;
use super::SerdeVecHashMap;
use super::SerdeVecMap;
#[test]
fn serde_vec_map_with_json() {
let map = HashMap::<u32, &str>::from_iter([(0, "foo"), (1, "bar"), (2, "baz")]);
let serde_vec_map = SerdeVecHashMap::from(map);
let map = IndexMap::<u32, &str>::from_iter([(0, "foo"), (1, "bar"), (2, "baz")]);
let serde_vec_map = SerdeVecMap::from(map);
// test round-trip to JSON:
let s = serde_json::to_string(&serde_vec_map).unwrap();
// with using a hashmap the order changes so asserting on the JSON itself is flaky, so if
// you want to see it working use --nocapture on the test...
println!("{s}");
let d: SerdeVecHashMap<u32, &str> = serde_json::from_str(&s).unwrap();
assert_eq!(r#"[[0,"foo"],[1,"bar"],[2,"baz"]]"#, s);
let d: SerdeVecMap<u32, &str> = serde_json::from_str(&s).unwrap();
assert_eq!(d, serde_vec_map);
}
}

View File

@ -221,6 +221,7 @@ struct ErrorMessage<T: Serialize> {
impl Error {
/// Convert this error into an HTTP [`Response`]
fn into_response(self) -> Response<Body> {
debug!(error = ?self, "API error");
match self {
Self::WriteBuffer(WriteBufferError::CatalogUpdateError(
err @ (CatalogError::TooManyDbs
@ -250,10 +251,25 @@ impl Error {
.body(body)
.unwrap()
}
Self::WriteBuffer(err @ WriteBufferError::ColumnDoesNotExist(_)) => {
let err: ErrorMessage<()> = ErrorMessage {
error: err.to_string(),
data: None,
};
let serialized = serde_json::to_string(&err).unwrap();
let body = Body::from(serialized);
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(body)
.unwrap()
}
Self::WriteBuffer(WriteBufferError::LastCacheError(ref lc_err)) => match lc_err {
last_cache::Error::InvalidCacheSize
| last_cache::Error::CacheAlreadyExists { .. }
| last_cache::Error::ColumnDoesNotExistByName { .. }
| last_cache::Error::ColumnDoesNotExistById { .. }
| last_cache::Error::KeyColumnDoesNotExist { .. }
| last_cache::Error::KeyColumnDoesNotExistByName { .. }
| last_cache::Error::InvalidKeyColumn
| last_cache::Error::ValueColumnDoesNotExist { .. } => Response::builder()
.status(StatusCode::BAD_REQUEST)
@ -697,9 +713,35 @@ where
.catalog()
.db_schema_and_id(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
.table_name_to_id(table.as_str())
let (table_id, table_def) = db_schema
.table_definition_and_id(table.as_str())
.ok_or_else(|| WriteBufferError::TableDoesNotExist)?;
let key_columns = key_columns
.map(|names| {
names
.into_iter()
.map(|name| {
table_def
.column_def_and_id(name.as_str())
.map(|(id, def)| (id, Arc::clone(&def.name)))
.ok_or_else(|| WriteBufferError::ColumnDoesNotExist(name))
})
.collect::<Result<Vec<_>, WriteBufferError>>()
})
.transpose()?;
let value_columns = value_columns
.map(|names| {
names
.into_iter()
.map(|name| {
table_def
.column_def_and_id(name.as_str())
.map(|(id, def)| (id, Arc::clone(&def.name)))
.ok_or_else(|| WriteBufferError::ColumnDoesNotExist(name))
})
.collect::<Result<Vec<_>, WriteBufferError>>()
})
.transpose()?;
match self
.write_buffer

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use arrow::array::{GenericListBuilder, StringBuilder};
use arrow::array::{GenericListBuilder, UInt32Builder};
use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{error::DataFusionError, logical_expr::Expr};
@ -31,12 +31,12 @@ fn last_caches_schema() -> SchemaRef {
Field::new("name", DataType::Utf8, false),
Field::new(
"key_columns",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
false,
),
Field::new(
"value_columns",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
true,
),
Field::new("count", DataType::UInt64, false),
@ -83,20 +83,20 @@ fn from_last_cache_definitions(
));
// Key Columns
columns.push({
let values_builder = StringBuilder::new();
let values_builder = UInt32Builder::new();
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);
for c in caches {
c.key_columns
.iter()
.for_each(|k| builder.values().append_value(k));
.for_each(|k| builder.values().append_value(k.as_u32()));
builder.append(true);
}
Arc::new(builder.finish())
});
// Value Columns
columns.push({
let values_builder = StringBuilder::new();
let values_builder = UInt32Builder::new();
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);
for c in caches {
@ -104,7 +104,7 @@ fn from_last_cache_definitions(
LastCacheValueColumnsDef::Explicit { columns } => {
columns
.iter()
.for_each(|v| builder.values().append_value(v));
.for_each(|v| builder.values().append_value(v.as_u32()));
builder.append(true);
}
LastCacheValueColumnsDef::AllNonKeyColumns => {

View File

@ -23,6 +23,7 @@ byteorder.workspace = true
crc32fast.workspace = true
futures-util.workspace = true
hashbrown.workspace = true
indexmap.workspace = true
object_store.workspace = true
parking_lot.workspace = true
serde.workspace = true

View File

@ -11,7 +11,8 @@ use crate::snapshot_tracker::SnapshotInfo;
use async_trait::async_trait;
use data_types::Timestamp;
use hashbrown::HashMap;
use influxdb3_id::{DbId, SerdeVecHashMap, TableId};
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
@ -241,7 +242,7 @@ pub struct TableDefinition {
pub table_name: Arc<str>,
pub table_id: TableId,
pub field_definitions: Vec<FieldDefinition>,
pub key: Option<Vec<String>>,
pub key: Option<Vec<ColumnId>>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
@ -256,9 +257,24 @@ pub struct FieldAdditions {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct FieldDefinition {
pub name: Arc<str>,
pub id: ColumnId,
pub data_type: FieldDataType,
}
impl FieldDefinition {
pub fn new(
id: ColumnId,
name: impl Into<Arc<str>>,
data_type: impl Into<FieldDataType>,
) -> Self {
Self {
id,
name: name.into(),
data_type: data_type.into(),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum FieldDataType {
String,
@ -307,11 +323,11 @@ pub struct LastCacheDefinition {
/// The table id the cache is associated with
pub table_id: TableId,
/// The table name the cache is associated with
pub table: String,
pub table: Arc<str>,
/// Given name of the cache
pub name: String,
pub name: Arc<str>,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
pub key_columns: Vec<ColumnId>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
@ -322,12 +338,15 @@ pub struct LastCacheDefinition {
impl LastCacheDefinition {
/// Create a new [`LastCacheDefinition`] with explicit value columns
///
/// This is intended for tests and expects that the column id for the time
/// column is included in the value columns argument.
pub fn new_with_explicit_value_columns(
table_id: TableId,
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
value_columns: impl IntoIterator<Item: Into<String>>,
table: impl Into<Arc<str>>,
name: impl Into<Arc<str>>,
key_columns: Vec<ColumnId>,
value_columns: Vec<ColumnId>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
@ -335,9 +354,9 @@ impl LastCacheDefinition {
table_id,
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
key_columns,
value_columns: LastCacheValueColumnsDef::Explicit {
columns: value_columns.into_iter().map(Into::into).collect(),
columns: value_columns,
},
count: count.try_into()?,
ttl,
@ -345,11 +364,13 @@ impl LastCacheDefinition {
}
/// Create a new [`LastCacheDefinition`] with explicit value columns
///
/// This is intended for tests.
pub fn new_all_non_key_value_columns(
table_id: TableId,
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
table: impl Into<Arc<str>>,
name: impl Into<Arc<str>>,
key_columns: Vec<ColumnId>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
@ -357,7 +378,7 @@ impl LastCacheDefinition {
table_id,
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
key_columns,
value_columns: LastCacheValueColumnsDef::AllNonKeyColumns,
count: count.try_into()?,
ttl,
@ -371,7 +392,7 @@ impl LastCacheDefinition {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
Explicit { columns: Vec<ColumnId> },
/// Stores all non-key columns
AllNonKeyColumns,
}
@ -432,9 +453,9 @@ impl PartialEq<LastCacheSize> for usize {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct LastCacheDelete {
pub table_name: String,
pub table_name: Arc<str>,
pub table_id: TableId,
pub name: String,
pub name: Arc<str>,
}
#[serde_as]
@ -442,7 +463,7 @@ pub struct LastCacheDelete {
pub struct WriteBatch {
pub database_id: DbId,
pub database_name: Arc<str>,
pub table_chunks: SerdeVecHashMap<TableId, TableChunks>,
pub table_chunks: SerdeVecMap<TableId, TableChunks>,
pub min_time_ns: i64,
pub max_time_ns: i64,
}
@ -451,7 +472,7 @@ impl WriteBatch {
pub fn new(
database_id: DbId,
database_name: Arc<str>,
table_chunks: HashMap<TableId, TableChunks>,
table_chunks: IndexMap<TableId, TableChunks>,
) -> Self {
// find the min and max times across the table chunks
let (min_time_ns, max_time_ns) = table_chunks.values().fold(
@ -475,7 +496,7 @@ impl WriteBatch {
pub fn add_write_batch(
&mut self,
new_table_chunks: SerdeVecHashMap<TableId, TableChunks>,
new_table_chunks: SerdeVecMap<TableId, TableChunks>,
min_time_ns: i64,
max_time_ns: i64,
) {
@ -536,10 +557,19 @@ pub struct TableChunk {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Field {
pub name: Arc<str>,
pub id: ColumnId,
pub value: FieldData,
}
impl Field {
pub fn new(id: ColumnId, value: impl Into<FieldData>) -> Self {
Self {
id,
value: value.into(),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct Row {
pub time: i64,
@ -596,6 +626,18 @@ impl<'a> From<FieldValue<'a>> for FieldData {
}
}
impl<'a> From<&FieldValue<'a>> for FieldData {
fn from(value: &FieldValue<'a>) -> Self {
match value {
FieldValue::I64(v) => Self::Integer(*v),
FieldValue::U64(v) => Self::UInteger(*v),
FieldValue::F64(v) => Self::Float(*v),
FieldValue::String(v) => Self::String(v.to_string()),
FieldValue::Boolean(v) => Self::Boolean(*v),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct WalContents {
/// The min timestamp from any writes in the WAL file

View File

@ -613,7 +613,8 @@ mod tests {
Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
};
use async_trait::async_trait;
use influxdb3_id::{DbId, TableId};
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, TableId};
use object_store::memory::InMemory;
use std::any::Any;
use tokio::sync::oneshot::Receiver;
@ -642,7 +643,7 @@ mod tests {
let op1 = WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: Arc::clone(&db_name),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 1,
@ -655,11 +656,11 @@ mod tests {
time: 1,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(1),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(1),
},
],
@ -668,11 +669,11 @@ mod tests {
time: 3,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(2),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(3),
},
],
@ -691,7 +692,7 @@ mod tests {
let op2 = WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: Arc::clone(&db_name),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 12,
@ -703,11 +704,11 @@ mod tests {
time: 12,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(62_000000000),
},
],
@ -732,7 +733,7 @@ mod tests {
ops: vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 1,
@ -745,11 +746,11 @@ mod tests {
time: 1,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(1),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(1),
},
],
@ -758,11 +759,11 @@ mod tests {
time: 3,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(2),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(3),
},
],
@ -771,11 +772,11 @@ mod tests {
time: 12,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(62_000000000),
},
],
@ -803,7 +804,7 @@ mod tests {
ops: vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 12,
@ -815,11 +816,11 @@ mod tests {
time: 12,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(62_000000000),
},
],
@ -876,7 +877,7 @@ mod tests {
let op3 = WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: Arc::clone(&db_name),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 26,
@ -888,11 +889,11 @@ mod tests {
time: 26,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(128_000000000),
},
],
@ -937,7 +938,7 @@ mod tests {
ops: vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: HashMap::from([(
table_chunks: IndexMap::from([(
TableId::from(0),
TableChunks {
min_time: 26,
@ -949,11 +950,11 @@ mod tests {
time: 26,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(128_000000000),
},
],

View File

@ -91,7 +91,7 @@ mod tests {
use crate::{
Field, FieldData, Row, TableChunk, TableChunks, WalFileSequenceNumber, WalOp, WriteBatch,
};
use influxdb3_id::{DbId, SerdeVecHashMap, TableId};
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
#[test]
fn test_serialize_deserialize() {
@ -100,11 +100,11 @@ mod tests {
time: 1,
fields: vec![
Field {
name: "f1".into(),
id: ColumnId::from(0),
value: FieldData::Integer(10),
},
Field {
name: "baz".into(),
id: ColumnId::from(1),
value: FieldData::Timestamp(1),
},
],
@ -116,7 +116,7 @@ mod tests {
chunk_time_to_chunk: [(1, chunk)].iter().cloned().collect(),
};
let table_id = TableId::from(2);
let mut table_chunks = SerdeVecHashMap::new();
let mut table_chunks = SerdeVecMap::new();
table_chunks.insert(table_id, chunks);
let contents = WalContents {

File diff suppressed because it is too large Load Diff

View File

@ -8,8 +8,8 @@ expression: caches
"table": "test_table_1",
"name": "test_cache_1",
"key_columns": [
"t1",
"t2"
0,
1
],
"value_columns": {
"type": "all_non_key_columns"
@ -22,13 +22,13 @@ expression: caches
"table": "test_table_2",
"name": "test_cache_2",
"key_columns": [
"t1"
6
],
"value_columns": {
"type": "explicit",
"columns": [
"f1",
"time"
8,
7
]
},
"count": 5,
@ -42,8 +42,8 @@ expression: caches
"value_columns": {
"type": "explicit",
"columns": [
"f2",
"time"
9,
7
]
},
"count": 10,

View File

@ -10,15 +10,15 @@ use datafusion::{
physical_plan::{memory::MemoryExec, ExecutionPlan},
scalar::ScalarValue,
};
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::DbId;
use influxdb3_id::TableId;
use super::LastCacheProvider;
struct LastCacheFunctionProvider {
db_id: DbId,
table_id: TableId,
cache_name: String,
table_def: Arc<TableDefinition>,
cache_name: Arc<str>,
schema: SchemaRef,
provider: Arc<LastCacheProvider>,
}
@ -54,11 +54,11 @@ impl TableProvider for LastCacheFunctionProvider {
let read = self.provider.cache_map.read();
let batches = if let Some(cache) = read
.get(&self.db_id)
.and_then(|db| db.get(&self.table_id))
.and_then(|db| db.get(&self.table_def.table_id))
.and_then(|tbl| tbl.get(&self.cache_name))
{
let predicates = cache.convert_filter_exprs(filters);
cache.to_record_batches(&predicates)?
cache.to_record_batches(Arc::clone(&self.table_def), &predicates)?
} else {
// If there is no cache, it means that it was removed, in which case, we just return
// an empty set of record batches.
@ -97,27 +97,29 @@ impl TableFunctionImpl for LastCacheFunction {
}
None => None,
};
let table_id = self
let Some(table_def) = self
.provider
.catalog
.db_schema_by_id(self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str())
.expect("table exists");
match self.provider.get_cache_name_and_schema(
.table_definition(table_name.as_str())
else {
return plan_err!("provided table name is invalid");
};
let Some((cache_name, schema)) = self.provider.get_cache_name_and_schema(
self.db_id,
table_id,
table_def.table_id,
cache_name.map(|x| x.as_str()),
) {
Some((cache_name, schema)) => Ok(Arc::new(LastCacheFunctionProvider {
db_id: self.db_id,
table_id,
cache_name,
schema,
provider: Arc::clone(&self.provider),
})),
None => plan_err!("could not find cache for the given arguments"),
}
) else {
return plan_err!("could not find cache for the given arguments");
};
Ok(Arc::new(LastCacheFunctionProvider {
db_id: self.db_id,
table_def,
cache_name,
schema,
provider: Arc::clone(&self.provider),
}))
}
}

View File

@ -18,9 +18,9 @@ use datafusion::error::DataFusionError;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::{self, SequenceNumber};
use influxdb3_id::DbId;
use influxdb3_id::ParquetFileId;
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, DbId};
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::QueryChunk;
use iox_time::Time;
@ -117,8 +117,8 @@ pub trait LastCacheManager: Debug + Send + Sync + 'static {
cache_name: Option<&str>,
count: Option<usize>,
ttl: Option<Duration>,
key_columns: Option<Vec<String>>,
value_columns: Option<Vec<String>>,
key_columns: Option<Vec<(ColumnId, Arc<str>)>>,
value_columns: Option<Vec<(ColumnId, Arc<str>)>>,
) -> Result<Option<LastCacheDefinition>, write_buffer::Error>;
/// Delete a last-n-value cache
///
@ -171,6 +171,8 @@ pub struct PersistedSnapshot {
pub next_db_id: DbId,
/// The next table id to be used for tables when the snapshot is loaded
pub next_table_id: TableId,
/// The next column id to be used for columns when the snapshot is loaded
pub next_column_id: ColumnId,
/// The snapshot sequence number associated with this snapshot
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// The wal file sequence number that triggered this snapshot
@ -202,6 +204,7 @@ impl PersistedSnapshot {
next_file_id: ParquetFileId::next_id(),
next_db_id: DbId::next_id(),
next_table_id: TableId::next_id(),
next_column_id: ColumnId::next_id(),
snapshot_sequence_number,
wal_file_sequence_number,
catalog_sequence_number,

View File

@ -416,7 +416,7 @@ mod tests {
use super::*;
use crate::ParquetFileId;
use influxdb3_catalog::catalog::SequenceNumber;
use influxdb3_id::{DbId, TableId};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::SnapshotSequenceNumber;
use object_store::memory::InMemory;
use observability_deps::tracing::info;
@ -493,6 +493,7 @@ mod tests {
next_file_id: ParquetFileId::from(0),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::new(0),
@ -516,6 +517,7 @@ mod tests {
next_file_id: ParquetFileId::from(0),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
@ -530,6 +532,7 @@ mod tests {
next_file_id: ParquetFileId::from(1),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: SequenceNumber::default(),
@ -544,6 +547,7 @@ mod tests {
next_file_id: ParquetFileId::from(2),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: SequenceNumber::default(),
@ -579,6 +583,7 @@ mod tests {
next_file_id: ParquetFileId::from(0),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
@ -607,6 +612,7 @@ mod tests {
next_file_id: ParquetFileId::from(id),
next_db_id: DbId::from(1),
next_table_id: TableId::from(1),
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: SequenceNumber::new(id as u32),

View File

@ -23,7 +23,7 @@ use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::logical_expr::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{DbId, TableId};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::object_store::WalObjectStore;
use influxdb3_wal::CatalogOp::CreateLastCache;
use influxdb3_wal::{
@ -79,6 +79,9 @@ pub enum Error {
#[error("tried accessing database and table that do not exist")]
TableDoesNotExist,
#[error("tried accessing column with name ({0}) that does not exist")]
ColumnDoesNotExist(String),
#[error(
"updating catalog on delete of last cache failed, you will need to delete the cache \
again on server restart"
@ -150,6 +153,11 @@ impl WriteBufferImpl {
.first()
.map(|s| s.next_table_id.set_next_id())
.unwrap_or(());
// Set the next table id to use when adding a new database
persisted_snapshots
.first()
.map(|s| s.next_column_id.set_next_id())
.unwrap_or(());
// Set the next file id to use when persisting ParquetFiles
persisted_snapshots
.first()
@ -452,27 +460,21 @@ impl LastCacheManager for WriteBufferImpl {
cache_name: Option<&str>,
count: Option<usize>,
ttl: Option<Duration>,
key_columns: Option<Vec<String>>,
value_columns: Option<Vec<String>>,
key_columns: Option<Vec<(ColumnId, Arc<str>)>>,
value_columns: Option<Vec<(ColumnId, Arc<str>)>>,
) -> Result<Option<LastCacheDefinition>, Error> {
let cache_name = cache_name.map(Into::into);
let catalog = self.catalog();
let db_schema = catalog
.db_schema_by_id(db_id)
.ok_or(Error::DbDoesNotExist)?;
let schema = db_schema
.table_schema_by_id(table_id)
let table_def = db_schema
.table_definition_by_id(table_id)
.ok_or(Error::TableDoesNotExist)?;
if let Some(info) = self.last_cache.create_cache(CreateCacheArguments {
db_id,
db_name: db_schema.name.to_string(),
table_id,
table_name: db_schema
.table_id_to_name(table_id)
.expect("table exists")
.to_string(),
schema,
table_def,
cache_name,
count,
ttl,
@ -514,10 +516,7 @@ impl LastCacheManager for WriteBufferImpl {
database_name: Arc::clone(&db_schema.name),
ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete {
table_id: tbl_id,
table_name: db_schema
.table_id_to_name(tbl_id)
.expect("table exists")
.to_string(),
table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"),
name: cache_name.into(),
})],
})])

View File

@ -80,11 +80,11 @@ impl QueryableBuffer {
_projection: Option<&Vec<usize>>,
_ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let (table_id, table_schema) = db_schema
.table_schema_and_id(table_name)
let (table_id, table_def) = db_schema
.table_definition_and_id(table_name)
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
let arrow_schema = table_schema.as_arrow();
let influx_schema = table_def.influx_schema();
let buffer = self.buffer.read();
@ -96,20 +96,20 @@ impl QueryableBuffer {
};
Ok(table_buffer
.partitioned_record_batches(Arc::clone(&arrow_schema), filters)
.partitioned_record_batches(Arc::clone(&table_def), filters)
.map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))?
.into_iter()
.map(|(gen_time, (ts_min_max, batches))| {
let row_count = batches.iter().map(|b| b.num_rows()).sum::<usize>();
let chunk_stats = create_chunk_statistics(
Some(row_count),
&table_schema,
influx_schema,
Some(ts_min_max),
&NoColumnRanges,
);
Arc::new(BufferChunk {
batches,
schema: table_schema.clone(),
schema: influx_schema.clone(),
stats: Arc::new(chunk_stats),
partition_id: TransitionPartitionId::new(
data_types::TableId::new(0),
@ -150,7 +150,11 @@ impl QueryableBuffer {
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
let db_schema = catalog.db_schema_by_id(*database_id).expect("db exists");
for (table_id, table_buffer) in table_map.iter_mut() {
let snapshot_chunks = table_buffer.snapshot(snapshot_details.end_time_marker);
let table_def = db_schema
.table_definition_by_id(*table_id)
.expect("table exists");
let snapshot_chunks =
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
for chunk in snapshot_chunks {
let table_name =
@ -375,12 +379,12 @@ impl BufferState {
for op in catalog_batch.ops {
match op {
CatalogOp::CreateLastCache(definition) => {
let table_schema = db_schema
.table_schema_by_id(definition.table_id)
let table_def = db_schema
.table_definition_by_id(definition.table_id)
.expect("table should exist");
last_cache_provider.create_cache_from_definition(
db_schema.id,
&table_schema,
table_def,
&definition,
);
}
@ -411,18 +415,20 @@ impl BufferState {
for (table_id, table_chunks) in write_batch.table_chunks {
let table_buffer = database_buffer.entry(table_id).or_insert_with(|| {
let table_schema = db_schema
let table_def = db_schema
.table_definition_by_id(table_id)
.expect("table should exist");
let sort_key = table_schema
// TODO: can we have the primary key stored on the table definition (we already have
// the series key, so that doesn't seem like too much of a stretch).
let sort_key = table_def
.influx_schema()
.primary_key()
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>();
let index_columns = table_schema.index_columns();
let index_columns = table_def.index_column_ids();
TableBuffer::new(&index_columns, SortKey::from(sort_key))
TableBuffer::new(index_columns, SortKey::from(sort_key))
});
for (chunk_time, chunk) in table_chunks.chunk_time_to_chunk {
table_buffer.buffer_chunk(chunk_time, chunk.rows);

View File

@ -9,74 +9,66 @@ expression: catalog_json
{
"id": 0,
"name": "db",
"table_map": [
{
"name": "table",
"table_id": 0
}
],
"tables": [
[
0,
{
"cols": {
"f1": {
"column_id": 0,
"influx_type": "field",
"nullable": true,
"type": "bool"
},
"f2": {
"column_id": 3,
"influx_type": "field",
"nullable": true,
"type": "i64"
},
"t1": {
"column_id": 1,
"influx_type": "tag",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
"cols": [
[
1,
{
"id": 1,
"influx_type": "field",
"name": "f1",
"nullable": true,
"type": "bool"
}
},
"time": {
"column_id": 2,
"influx_type": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
],
[
3,
{
"id": 3,
"influx_type": "field",
"name": "f2",
"nullable": true,
"type": "i64"
}
}
},
"column_map": [
{
"column_id": 0,
"name": "f1"
},
{
"column_id": 1,
"name": "t1"
},
{
"column_id": 2,
"name": "time"
},
{
"column_id": 3,
"name": "f2"
}
],
[
0,
{
"id": 0,
"influx_type": "tag",
"name": "t1",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
}
}
],
[
2,
{
"id": 2,
"influx_type": "time",
"name": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
}
}
]
],
"last_caches": [
{
"keys": [
"t1"
0
],
"n": 1,
"name": "cache",
@ -86,7 +78,6 @@ expression: catalog_json
"vals": null
}
],
"next_column_id": 4,
"table_id": 0,
"table_name": "table"
}

View File

@ -9,64 +9,56 @@ expression: catalog_json
{
"id": 0,
"name": "db",
"table_map": [
{
"name": "table",
"table_id": 0
}
],
"tables": [
[
0,
{
"cols": {
"f1": {
"column_id": 0,
"influx_type": "field",
"nullable": true,
"type": "bool"
},
"t1": {
"column_id": 1,
"influx_type": "tag",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
"cols": [
[
1,
{
"id": 1,
"influx_type": "field",
"name": "f1",
"nullable": true,
"type": "bool"
}
},
"time": {
"column_id": 2,
"influx_type": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
],
[
0,
{
"id": 0,
"influx_type": "tag",
"name": "t1",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
}
}
}
},
"column_map": [
{
"column_id": 0,
"name": "f1"
},
{
"column_id": 1,
"name": "t1"
},
{
"column_id": 2,
"name": "time"
}
],
[
2,
{
"id": 2,
"influx_type": "time",
"name": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
}
}
]
],
"last_caches": [
{
"keys": [
"t1"
0
],
"n": 1,
"name": "cache",
@ -76,7 +68,6 @@ expression: catalog_json
"vals": null
}
],
"next_column_id": 3,
"table_id": 0,
"table_name": "table"
}

View File

@ -9,71 +9,62 @@ expression: catalog_json
{
"id": 0,
"name": "db",
"table_map": [
{
"name": "table",
"table_id": 0
}
],
"tables": [
[
0,
{
"cols": {
"f1": {
"column_id": 0,
"influx_type": "field",
"nullable": true,
"type": "bool"
},
"f2": {
"column_id": 3,
"influx_type": "field",
"nullable": true,
"type": "i64"
},
"t1": {
"column_id": 1,
"influx_type": "tag",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
"cols": [
[
1,
{
"id": 1,
"influx_type": "field",
"name": "f1",
"nullable": true,
"type": "bool"
}
},
"time": {
"column_id": 2,
"influx_type": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
],
[
3,
{
"id": 3,
"influx_type": "field",
"name": "f2",
"nullable": true,
"type": "i64"
}
}
},
"column_map": [
{
"column_id": 0,
"name": "f1"
},
{
"column_id": 1,
"name": "t1"
},
{
"column_id": 2,
"name": "time"
},
{
"column_id": 3,
"name": "f2"
}
],
[
0,
{
"id": 0,
"influx_type": "tag",
"name": "t1",
"nullable": true,
"type": {
"dict": [
"i32",
"str"
]
}
}
],
[
2,
{
"id": 2,
"influx_type": "time",
"name": "time",
"nullable": false,
"type": {
"time": [
"ns",
null
]
}
}
]
],
"next_column_id": 4,
"table_id": 0,
"table_name": "table"
}

View File

@ -5,15 +5,18 @@ use arrow::array::{
Int64Builder, StringArray, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt64Builder,
};
use arrow::datatypes::{GenericStringType, Int32Type, SchemaRef};
use arrow::datatypes::{GenericStringType, Int32Type};
use arrow::record_batch::RecordBatch;
use data_types::TimestampMinMax;
use datafusion::logical_expr::{BinaryExpr, Expr};
use hashbrown::HashMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::ColumnId;
use influxdb3_wal::{FieldData, Row};
use observability_deps::tracing::{debug, error, info};
use schema::sort::SortKey;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashSet};
use std::mem::size_of;
use std::sync::Arc;
@ -38,7 +41,7 @@ pub struct TableBuffer {
}
impl TableBuffer {
pub fn new(index_columns: &[&str], sort_key: SortKey) -> Self {
pub fn new(index_columns: Vec<ColumnId>, sort_key: SortKey) -> Self {
Self {
chunk_time_to_chunks: BTreeMap::default(),
snapshotting_chunks: vec![],
@ -67,10 +70,11 @@ impl TableBuffer {
/// The partitions are stored and returned in a `HashMap`, keyed on the generation time.
pub fn partitioned_record_batches(
&self,
schema: SchemaRef,
table_def: Arc<TableDefinition>,
filter: &[Expr],
) -> Result<HashMap<i64, (TimestampMinMax, Vec<RecordBatch>)>> {
let mut batches = HashMap::new();
let schema = table_def.schema.as_arrow();
for sc in &self.snapshotting_chunks {
let cols: std::result::Result<Vec<_>, _> = schema
.fields()
@ -97,14 +101,19 @@ impl TableBuffer {
.entry(*t)
.or_insert_with(|| (ts_min_max, Vec::new()));
*ts = ts.union(&ts_min_max);
v.push(c.record_batch(schema.clone(), filter)?);
v.push(c.record_batch(Arc::clone(&table_def), filter)?);
}
Ok(batches)
}
pub fn record_batches(&self, schema: SchemaRef, filter: &[Expr]) -> Result<Vec<RecordBatch>> {
pub fn record_batches(
&self,
table_def: Arc<TableDefinition>,
filter: &[Expr],
) -> Result<Vec<RecordBatch>> {
let mut batches =
Vec::with_capacity(self.snapshotting_chunks.len() + self.chunk_time_to_chunks.len());
let schema = table_def.schema.as_arrow();
for sc in &self.snapshotting_chunks {
let cols: std::result::Result<Vec<_>, _> = schema
@ -125,7 +134,7 @@ impl TableBuffer {
}
for c in self.chunk_time_to_chunks.values() {
batches.push(c.record_batch(schema.clone(), filter)?)
batches.push(c.record_batch(Arc::clone(&table_def), filter)?)
}
Ok(batches)
@ -157,8 +166,8 @@ impl TableBuffer {
let mut size = size_of::<Self>();
for c in self.chunk_time_to_chunks.values() {
for (k, v) in &c.data {
size += k.len() + size_of::<String>() + v.size();
for biulder in c.data.values() {
size += size_of::<ColumnId>() + size_of::<String>() + biulder.size();
}
size += c.index.size();
@ -167,7 +176,11 @@ impl TableBuffer {
size
}
pub fn snapshot(&mut self, older_than_chunk_time: i64) -> Vec<SnapshotChunk> {
pub fn snapshot(
&mut self,
table_def: Arc<TableDefinition>,
older_than_chunk_time: i64,
) -> Vec<SnapshotChunk> {
info!(%older_than_chunk_time, "Snapshotting table buffer");
let keys_to_remove = self
.chunk_time_to_chunks
@ -180,7 +193,7 @@ impl TableBuffer {
.map(|chunk_time| {
let chunk = self.chunk_time_to_chunks.remove(&chunk_time).unwrap();
let timestamp_min_max = chunk.timestamp_min_max();
let (schema, record_batch) = chunk.into_schema_record_batch();
let (schema, record_batch) = chunk.into_schema_record_batch(Arc::clone(&table_def));
SnapshotChunk {
chunk_time,
@ -232,7 +245,7 @@ impl std::fmt::Debug for TableBuffer {
struct MutableTableChunk {
timestamp_min: i64,
timestamp_max: i64,
data: BTreeMap<Arc<str>, Builder>,
data: BTreeMap<ColumnId, Builder>,
row_count: usize,
index: BufferIndex,
}
@ -245,14 +258,14 @@ impl MutableTableChunk {
let mut value_added = HashSet::with_capacity(r.fields.len());
for f in r.fields {
value_added.insert(f.name.clone());
value_added.insert(f.id);
match f.value {
FieldData::Timestamp(v) => {
self.timestamp_min = self.timestamp_min.min(v);
self.timestamp_max = self.timestamp_max.max(v);
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
debug!("Creating new timestamp builder");
let mut time_builder = TimestampNanosecondBuilder::new();
// append nulls for all previous rows
@ -269,20 +282,17 @@ impl MutableTableChunk {
}
}
FieldData::Tag(v) => {
if !self.data.contains_key(&f.name) {
if let Entry::Vacant(e) = self.data.entry(f.id) {
let mut tag_builder = StringDictionaryBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
tag_builder.append_null();
}
self.data.insert(f.name.clone(), Builder::Tag(tag_builder));
e.insert(Builder::Tag(tag_builder));
}
let b = self
.data
.get_mut(&f.name)
.expect("tag builder should exist");
let b = self.data.get_mut(&f.id).expect("tag builder should exist");
if let Builder::Tag(b) = b {
self.index.add_row_if_indexed_column(b.len(), &f.name, &v);
self.index.add_row_if_indexed_column(b.len(), f.id, &v);
b.append(v)
.expect("shouldn't be able to overflow 32 bit dictionary");
} else {
@ -290,25 +300,22 @@ impl MutableTableChunk {
}
}
FieldData::Key(v) => {
if !self.data.contains_key(&f.name) {
if let Entry::Vacant(e) = self.data.entry(f.id) {
let key_builder = StringDictionaryBuilder::new();
if self.row_count > 0 {
panic!("series key columns must be passed in the very first write for a table");
}
self.data.insert(f.name.clone(), Builder::Key(key_builder));
e.insert(Builder::Key(key_builder));
}
let b = self
.data
.get_mut(&f.name)
.expect("key builder should exist");
let b = self.data.get_mut(&f.id).expect("key builder should exist");
let Builder::Key(b) = b else {
panic!("unexpected field type");
};
self.index.add_row_if_indexed_column(b.len(), &f.name, &v);
self.index.add_row_if_indexed_column(b.len(), f.id, &v);
b.append_value(v);
}
FieldData::String(v) => {
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut string_builder = StringBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
@ -323,7 +330,7 @@ impl MutableTableChunk {
}
}
FieldData::Integer(v) => {
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut int_builder = Int64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
@ -338,7 +345,7 @@ impl MutableTableChunk {
}
}
FieldData::UInteger(v) => {
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut uint_builder = UInt64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
@ -353,7 +360,7 @@ impl MutableTableChunk {
}
}
FieldData::Float(v) => {
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut float_builder = Float64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
@ -368,7 +375,7 @@ impl MutableTableChunk {
}
}
FieldData::Boolean(v) => {
let b = self.data.entry(f.name).or_insert_with(|| {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut bool_builder = BooleanBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
@ -410,25 +417,32 @@ impl MutableTableChunk {
TimestampMinMax::new(self.timestamp_min, self.timestamp_max)
}
fn record_batch(&self, schema: SchemaRef, filter: &[Expr]) -> Result<RecordBatch> {
let row_ids = self.index.get_rows_from_index_for_filter(filter);
fn record_batch(
&self,
table_def: Arc<TableDefinition>,
filter: &[Expr],
) -> Result<RecordBatch> {
let row_ids = self
.index
.get_rows_from_index_for_filter(Arc::clone(&table_def), filter);
let schema = table_def.schema.as_arrow();
let mut cols = Vec::with_capacity(schema.fields().len());
for f in schema.fields() {
match row_ids {
Some(row_ids) => {
let b = self
.data
.get(f.name().as_str())
let b = table_def
.column_name_to_id(f.name().as_str())
.and_then(|id| self.data.get(&id))
.ok_or_else(|| Error::FieldNotFound(f.name().to_string()))?
.get_rows(row_ids);
cols.push(b);
}
None => {
let b = self
.data
.get(f.name().as_str())
let b = table_def
.column_name_to_id(f.name().as_str())
.and_then(|id| self.data.get(&id))
.ok_or_else(|| Error::FieldNotFound(f.name().to_string()))?
.as_arrow();
cols.push(b);
@ -439,12 +453,18 @@ impl MutableTableChunk {
Ok(RecordBatch::try_new(schema, cols)?)
}
fn into_schema_record_batch(self) -> (Schema, RecordBatch) {
fn into_schema_record_batch(self, table_def: Arc<TableDefinition>) -> (Schema, RecordBatch) {
let mut cols = Vec::with_capacity(self.data.len());
let mut schema_builder = SchemaBuilder::new();
for (col_name, builder) in self.data.into_iter() {
for (col_id, builder) in self.data.into_iter() {
let (col_type, col) = builder.into_influxcol_and_arrow();
schema_builder.influx_column(col_name.as_ref(), col_type);
schema_builder.influx_column(
table_def
.column_id_to_name(col_id)
.expect("valid column id")
.as_ref(),
col_type,
);
cols.push(col);
}
let schema = schema_builder
@ -471,25 +491,25 @@ impl std::fmt::Debug for MutableTableChunk {
}
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
struct BufferIndex {
// column name -> string value -> row indexes
columns: HashMap<Arc<str>, HashMap<String, Vec<usize>>>,
// column id -> string value -> row indexes
columns: HashMap<ColumnId, HashMap<String, Vec<usize>>>,
}
impl BufferIndex {
fn new(column_names: &[&str]) -> Self {
fn new(column_ids: Vec<ColumnId>) -> Self {
let mut columns = HashMap::new();
for c in column_names {
columns.insert(c.to_string().into(), HashMap::new());
for id in column_ids {
columns.insert(id, HashMap::new());
}
Self { columns }
}
fn add_row_if_indexed_column(&mut self, row_index: usize, column_name: &str, value: &str) {
if let Some(column) = self.columns.get_mut(column_name) {
fn add_row_if_indexed_column(&mut self, row_index: usize, column_id: ColumnId, value: &str) {
if let Some(column) = self.columns.get_mut(&column_id) {
column
.entry_ref(value)
.and_modify(|c| c.push(row_index))
@ -497,7 +517,11 @@ impl BufferIndex {
}
}
fn get_rows_from_index_for_filter(&self, filter: &[Expr]) -> Option<&Vec<usize>> {
fn get_rows_from_index_for_filter(
&self,
table_def: Arc<TableDefinition>,
filter: &[Expr],
) -> Option<&Vec<usize>> {
for expr in filter {
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
if *op == datafusion::logical_expr::Operator::Eq {
@ -505,9 +529,9 @@ impl BufferIndex {
if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) =
right.as_ref()
{
return self
.columns
.get(c.name.as_str())
return table_def
.column_name_to_id(c.name())
.and_then(|id| self.columns.get(&id))
.and_then(|m| m.get(v.as_str()));
}
}
@ -521,8 +545,10 @@ impl BufferIndex {
#[allow(dead_code)]
fn size(&self) -> usize {
let mut size = size_of::<Self>();
for (k, v) in &self.columns {
size += k.len() + size_of::<String>() + size_of::<HashMap<String, Vec<usize>>>();
for (_, v) in &self.columns {
size += size_of::<ColumnId>()
+ size_of::<String>()
+ size_of::<HashMap<String, Vec<usize>>>();
for (k, v) in v {
size += k.len() + size_of::<String>() + size_of::<Vec<usize>>();
size += v.len() * size_of::<usize>();
@ -690,18 +716,34 @@ mod tests {
use super::*;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion::common::Column;
use influxdb3_id::TableId;
use influxdb3_wal::Field;
use schema::{InfluxFieldType, SchemaBuilder};
use schema::InfluxFieldType;
#[test]
fn partitioned_table_buffer_batches() {
let mut table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let schema = SchemaBuilder::with_capacity(3)
.tag("tag")
.influx_field("val", InfluxFieldType::String)
.timestamp()
.build()
.unwrap();
let table_def = Arc::new(
TableDefinition::new(
TableId::new(),
"test_table".into(),
vec![
(ColumnId::from(0), "tag".into(), InfluxColumnType::Tag),
(
ColumnId::from(1),
"val".into(),
InfluxColumnType::Field(InfluxFieldType::String),
),
(
ColumnId::from(2),
"time".into(),
InfluxColumnType::Timestamp,
),
],
None,
)
.unwrap(),
);
let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty());
for t in 0..10 {
let offset = t * 10;
@ -710,15 +752,15 @@ mod tests {
time: offset + 1,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("a".to_string()),
},
Field {
name: "val".into(),
id: ColumnId::from(1),
value: FieldData::String(format!("thing {t}-1")),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(offset + 1),
},
],
@ -727,15 +769,15 @@ mod tests {
time: offset + 2,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("b".to_string()),
},
Field {
name: "val".into(),
id: ColumnId::from(1),
value: FieldData::String(format!("thing {t}-2")),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(offset + 2),
},
],
@ -746,7 +788,7 @@ mod tests {
}
let partitioned_batches = table_buffer
.partitioned_record_batches(schema.as_arrow(), &[])
.partitioned_record_batches(Arc::clone(&table_def), &[])
.unwrap();
println!("{partitioned_batches:#?}");
@ -759,20 +801,20 @@ mod tests {
assert_eq!(TimestampMinMax::new(offset + 1, offset + 2), *ts_min_max);
assert_batches_sorted_eq!(
[
"+-----+-----------+--------------------------------+",
"| tag | val | time |",
"+-----+-----------+--------------------------------+",
"+-----+--------------------------------+-----------+",
"| tag | time | val |",
"+-----+--------------------------------+-----------+",
format!(
"| a | thing {t}-1 | 1970-01-01T00:00:00.{:0>9}Z |",
"| a | 1970-01-01T00:00:00.{:0>9}Z | thing {t}-1 |",
offset + 1
)
.as_str(),
format!(
"| b | thing {t}-2 | 1970-01-01T00:00:00.{:0>9}Z |",
"| b | 1970-01-01T00:00:00.{:0>9}Z | thing {t}-2 |",
offset + 2
)
.as_str(),
"+-----+-----------+--------------------------------+",
"+-----+--------------------------------+-----------+",
],
batches
);
@ -781,28 +823,43 @@ mod tests {
#[test]
fn tag_row_index() {
let mut table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let schema = SchemaBuilder::with_capacity(3)
.tag("tag")
.influx_field("value", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap();
let table_def = Arc::new(
TableDefinition::new(
TableId::new(),
"test_table".into(),
vec![
(ColumnId::from(0), "tag".into(), InfluxColumnType::Tag),
(
ColumnId::from(1),
"value".into(),
InfluxColumnType::Field(InfluxFieldType::Integer),
),
(
ColumnId::from(2),
"time".into(),
InfluxColumnType::Timestamp,
),
],
None,
)
.unwrap(),
);
let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty());
let rows = vec![
Row {
time: 1,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("a".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(1),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(1),
},
],
@ -811,15 +868,15 @@ mod tests {
time: 2,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("b".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(2),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(2),
},
],
@ -828,15 +885,15 @@ mod tests {
time: 3,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("a".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(3),
},
],
@ -860,20 +917,20 @@ mod tests {
.get(&0)
.unwrap()
.index
.get_rows_from_index_for_filter(filter)
.get_rows_from_index_for_filter(Arc::clone(&table_def), filter)
.unwrap();
assert_eq!(a_rows, &[0, 2]);
let a = table_buffer
.record_batches(schema.as_arrow(), filter)
.record_batches(Arc::clone(&table_def), filter)
.unwrap();
let expected_a = vec![
"+-----+-------+--------------------------------+",
"| tag | value | time |",
"+-----+-------+--------------------------------+",
"| a | 1 | 1970-01-01T00:00:00.000000001Z |",
"| a | 3 | 1970-01-01T00:00:00.000000003Z |",
"+-----+-------+--------------------------------+",
"+-----+--------------------------------+-------+",
"| tag | time | value |",
"+-----+--------------------------------+-------+",
"| a | 1970-01-01T00:00:00.000000001Z | 1 |",
"| a | 1970-01-01T00:00:00.000000003Z | 3 |",
"+-----+--------------------------------+-------+",
];
assert_batches_eq!(&expected_a, &a);
@ -893,41 +950,41 @@ mod tests {
.get(&0)
.unwrap()
.index
.get_rows_from_index_for_filter(filter)
.get_rows_from_index_for_filter(Arc::clone(&table_def), filter)
.unwrap();
assert_eq!(b_rows, &[1]);
let b = table_buffer
.record_batches(schema.as_arrow(), filter)
.record_batches(Arc::clone(&table_def), filter)
.unwrap();
let expected_b = vec![
"+-----+-------+--------------------------------+",
"| tag | value | time |",
"+-----+-------+--------------------------------+",
"| b | 2 | 1970-01-01T00:00:00.000000002Z |",
"+-----+-------+--------------------------------+",
"+-----+--------------------------------+-------+",
"| tag | time | value |",
"+-----+--------------------------------+-------+",
"| b | 1970-01-01T00:00:00.000000002Z | 2 |",
"+-----+--------------------------------+-------+",
];
assert_batches_eq!(&expected_b, &b);
}
#[test]
fn computed_size_of_buffer() {
let mut table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let mut table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty());
let rows = vec![
Row {
time: 1,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("a".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(1),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(1),
},
],
@ -936,15 +993,15 @@ mod tests {
time: 2,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("b".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(2),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(2),
},
],
@ -953,15 +1010,15 @@ mod tests {
time: 3,
fields: vec![
Field {
name: "tag".into(),
id: ColumnId::from(0),
value: FieldData::Tag("this is a long tag value to store".to_string()),
},
Field {
name: "value".into(),
id: ColumnId::from(1),
value: FieldData::Integer(3),
},
Field {
name: "time".into(),
id: ColumnId::from(2),
value: FieldData::Timestamp(3),
},
],
@ -971,12 +1028,12 @@ mod tests {
table_buffer.buffer_chunk(0, rows);
let size = table_buffer.computed_size();
assert_eq!(size, 18094);
assert_eq!(size, 18095);
}
#[test]
fn timestamp_min_max_works_when_empty() {
let table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let table_buffer = TableBuffer::new(vec![ColumnId::from(0)], SortKey::empty());
let timestamp_min_max = table_buffer.timestamp_min_max();
assert_eq!(timestamp_min_max.min, 0);
assert_eq!(timestamp_min_max.max, 0);

View File

@ -2,17 +2,17 @@ use std::{borrow::Cow, sync::Arc};
use crate::{write_buffer::Result, Precision, WriteLineError};
use data_types::{NamespaceName, Timestamp};
use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_catalog::catalog::{
influx_column_type_from_field_value, Catalog, DatabaseSchema, TableDefinition,
};
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDataType, FieldDefinition,
Gen1Duration, Row, TableChunks, WriteBatch,
CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration, Row,
TableChunks, WriteBatch,
};
use influxdb_line_protocol::{parse_lines, v3, FieldValue, ParsedLine};
use influxdb_line_protocol::{parse_lines, v3, ParsedLine};
use iox_time::Time;
use schema::{InfluxColumnType, TIME_COLUMN_NAME};
@ -28,10 +28,9 @@ pub(crate) struct WithCatalog {
/// Type state for the [`WriteValidator`] after it has parsed v1 or v3
/// line protocol.
pub(crate) struct LinesParsed<'raw, PL> {
pub(crate) struct LinesParsed {
catalog: WithCatalog,
db_schema: Arc<DatabaseSchema>,
lines: Vec<(PL, &'raw str)>,
lines: Vec<QualifiedLine>,
catalog_batch: Option<CatalogBatch>,
errors: Vec<WriteLineError>,
}
@ -74,28 +73,34 @@ impl WriteValidator<WithCatalog> {
self,
lp: &str,
accept_partial: bool,
) -> Result<WriteValidator<LinesParsed<'_, v3::ParsedLine<'_>>>> {
) -> Result<WriteValidator<LinesParsed>> {
let mut errors = vec![];
let mut lp_lines = lp.lines().peekable();
let mut lp_lines = lp.lines();
let mut lines = vec![];
let mut catalog_updates = vec![];
let mut schema = Cow::Borrowed(self.state.db_schema.as_ref());
for (line_idx, maybe_line) in v3::parse_lines(lp).enumerate() {
let (line, catalog_op) = match maybe_line
let (qualified_line, catalog_op) = match maybe_line
.map_err(|e| WriteLineError {
original_line: lp_lines.next().unwrap().to_string(),
line_number: line_idx + 1,
error_message: e.to_string(),
})
.and_then(|l| validate_v3_line(&mut schema, line_idx, l, lp_lines.peek().unwrap()))
{
Ok(line) => line,
Err(e) => {
if !accept_partial {
return Err(Error::ParseError(e));
.and_then(|line| {
validate_and_qualify_v3_line(
&mut schema,
line_idx,
line,
lp_lines.next().unwrap(),
)
}) {
Ok((qualified_line, catalog_ops)) => (qualified_line, catalog_ops),
Err(error) => {
if accept_partial {
errors.push(error);
} else {
errors.push(e);
return Err(Error::ParseError(error));
}
continue;
}
@ -105,7 +110,7 @@ impl WriteValidator<WithCatalog> {
catalog_updates.push(op);
}
lines.push((line, lp_lines.next().unwrap()));
lines.push(qualified_line);
}
let catalog_batch = if catalog_updates.is_empty() {
@ -121,18 +126,9 @@ impl WriteValidator<WithCatalog> {
Some(catalog_batch)
};
// if the schema has changed then the Cow will be owned so
// Arc it and pass that forward, otherwise just reuse the
// existing one
let db_schema = match schema {
Cow::Borrowed(_) => Arc::clone(&self.state.db_schema),
Cow::Owned(s) => Arc::new(s),
};
Ok(WriteValidator {
state: LinesParsed {
catalog: self.state,
db_schema,
lines,
catalog_batch,
errors,
@ -154,7 +150,7 @@ impl WriteValidator<WithCatalog> {
self,
lp: &str,
accept_partial: bool,
) -> Result<WriteValidator<LinesParsed<'_, ParsedLine<'_>>>> {
) -> Result<WriteValidator<LinesParsed>> {
let mut errors = vec![];
let mut lp_lines = lp.lines();
let mut lines = vec![];
@ -162,7 +158,7 @@ impl WriteValidator<WithCatalog> {
let mut schema = Cow::Borrowed(self.state.db_schema.as_ref());
for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
let (line, catalog_op) = match maybe_line
let (qualified_line, catalog_op) = match maybe_line
.map_err(|e| WriteLineError {
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
@ -170,9 +166,10 @@ impl WriteValidator<WithCatalog> {
line_number: line_idx + 1,
error_message: e.to_string(),
})
.and_then(|l| validate_v1_line(&mut schema, line_idx, l))
{
Ok(line) => line,
.and_then(|l| {
validate_and_qualify_v1_line(&mut schema, line_idx, l, lp_lines.next().unwrap())
}) {
Ok((qualified_line, catalog_op)) => (qualified_line, catalog_op),
Err(e) => {
if !accept_partial {
return Err(Error::ParseError(e));
@ -187,7 +184,7 @@ impl WriteValidator<WithCatalog> {
}
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
lines.push((line, lp_lines.next().unwrap()));
lines.push(qualified_line);
}
// All lines are parsed and validated, so all steps after this
@ -206,18 +203,9 @@ impl WriteValidator<WithCatalog> {
Some(catalog_batch)
};
// if the schema has changed then the Cow will be owned so
// Arc it and pass that forward, otherwise just reuse the
// existing one
let db_schema = match schema {
Cow::Borrowed(_) => Arc::clone(&self.state.db_schema),
Cow::Owned(s) => Arc::new(s),
};
Ok(WriteValidator {
state: LinesParsed {
catalog: self.state,
db_schema,
lines,
errors,
catalog_batch,
@ -226,6 +214,9 @@ impl WriteValidator<WithCatalog> {
}
}
/// Type alias for storing new columns added by a write
type ColumnTracker = Vec<(ColumnId, Arc<str>, InfluxColumnType)>;
/// Validate an individual line of v3 line protocol and update the database
/// schema
///
@ -235,15 +226,15 @@ impl WriteValidator<WithCatalog> {
///
/// This errors if the write is being performed against a v1 table, i.e., one that does not have
/// a series key.
fn validate_v3_line<'a>(
fn validate_and_qualify_v3_line(
db_schema: &mut Cow<'_, DatabaseSchema>,
line_number: usize,
line: v3::ParsedLine<'a>,
line: v3::ParsedLine,
raw_line: &str,
) -> Result<(v3::ParsedLine<'a>, Option<CatalogOp>), WriteLineError> {
) -> Result<(QualifiedLine, Option<CatalogOp>), WriteLineError> {
let mut catalog_op = None;
let table_name = line.series.measurement.as_str();
if let Some(table_def) = db_schema.table_definition(table_name) {
let qualified = if let Some(table_def) = db_schema.table_definition(table_name) {
let table_id = table_def.table_id;
if !table_def.is_v3() {
return Err(WriteLineError {
@ -253,8 +244,11 @@ fn validate_v3_line<'a>(
.to_string(),
});
}
let mut columns = Vec::with_capacity(line.column_count() + 1);
match (table_def.schema().series_key(), &line.series.series_key) {
// TODO: may be faster to compare using table def/column IDs than comparing with schema:
match (
table_def.influx_schema().series_key(),
&line.series.series_key,
) {
(Some(s), Some(l)) => {
let l = l.iter().map(|sk| sk.0.as_str()).collect::<Vec<&str>>();
if s != l {
@ -287,17 +281,38 @@ fn validate_v3_line<'a>(
}
(None, _) => unreachable!(),
}
if let Some(series_key) = &line.series.series_key {
for (sk, _) in series_key.iter() {
if !table_def.column_exists(sk) {
columns.push((sk.to_string(), InfluxColumnType::Tag));
}
let mut columns = ColumnTracker::with_capacity(line.column_count() + 1);
// qualify the series key members:
let tag_set = if let Some(sk) = &line.series.series_key {
let mut ts = Vec::with_capacity(sk.len());
for (key, val) in sk.iter() {
let col_id =
table_def
.column_name_to_id(key.as_str())
.ok_or_else(|| WriteLineError {
original_line: raw_line.to_string(),
line_number,
error_message: format!(
"write contained invalid series key column ({key})\
that does not exist in the catalog table definition"
),
})?;
ts.push(Field::new(col_id, val));
}
}
Some(ts)
} else {
None
};
// qualify the fields:
let mut field_set = Vec::with_capacity(line.field_set.len());
for (field_name, field_val) in line.field_set.iter() {
if let Some(schema_col_type) = table_def.field_type_by_name(field_name) {
if let Some((col_id, col_def)) = table_def.column_def_and_id(field_name.as_str()) {
let field_col_type = influx_column_type_from_field_value(field_val);
if field_col_type != schema_col_type {
let existing_col_type = col_def.data_type;
if field_col_type != existing_col_type {
let field_name = field_name.to_string();
return Err(WriteLineError {
original_line: raw_line.to_string(),
@ -305,19 +320,29 @@ fn validate_v3_line<'a>(
error_message: format!(
"invalid field value in line protocol for field '{field_name}' on line \
{line_number}: expected type {expected}, but got {got}",
expected = schema_col_type,
expected = existing_col_type,
got = field_col_type,
),
});
}
field_set.push(Field::new(col_id, field_val));
} else {
let col_id = ColumnId::new();
columns.push((
field_name.to_string(),
col_id,
Arc::from(field_name.as_str()),
influx_column_type_from_field_value(field_val),
));
field_set.push(Field::new(col_id, field_val));
}
}
// qualify the timestamp:
let time_col_id = table_def
.column_name_to_id(TIME_COLUMN_NAME)
.unwrap_or_else(ColumnId::new);
let timestamp = (time_col_id, line.timestamp);
// if we have new columns defined, add them to the db_schema table so that subsequent lines
// won't try to add the same definitions. Collect these additions into a catalog op, which
// will be applied to the catalog with any other ops after all lines in the write request
@ -325,56 +350,81 @@ fn validate_v3_line<'a>(
if !columns.is_empty() {
let database_name = Arc::clone(&db_schema.name);
let database_id = db_schema.id;
let t = db_schema.to_mut().tables.get_mut(&table_id).unwrap();
let db_schema = db_schema.to_mut();
let mut new_table_def = db_schema
.tables
.get_mut(&table_id)
.unwrap()
.as_ref()
.clone();
let mut fields = Vec::with_capacity(columns.len());
for (name, influx_type) in &columns {
fields.push(FieldDefinition {
name: name.as_str().into(),
data_type: FieldDataType::from(influx_type),
});
for (id, name, influx_type) in columns.iter() {
fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type));
}
catalog_op = Some(CatalogOp::AddFields(FieldAdditions {
database_id,
database_name,
table_id: t.table_id,
table_name: Arc::clone(&t.table_name),
table_id: new_table_def.table_id,
table_name: Arc::clone(&new_table_def.table_name),
field_definitions: fields,
}));
t.add_columns(columns).map_err(|e| WriteLineError {
original_line: raw_line.to_string(),
line_number: line_number + 1,
error_message: e.to_string(),
})?;
new_table_def
.add_columns(columns)
.map_err(|e| WriteLineError {
original_line: raw_line.to_string(),
line_number: line_number + 1,
error_message: e.to_string(),
})?;
db_schema.insert_table(table_id, Arc::new(new_table_def));
}
QualifiedLine {
table_id,
tag_set,
field_set,
timestamp,
}
} else {
let table_id = TableId::new();
let mut columns = Vec::new();
let mut key = Vec::new();
if let Some(series_key) = &line.series.series_key {
for (sk, _) in series_key.iter() {
key.push(sk.to_string());
columns.push((sk.to_string(), InfluxColumnType::Tag));
let tag_set = if let Some(series_key) = &line.series.series_key {
let mut ts = Vec::with_capacity(series_key.len());
for (sk, sv) in series_key.iter() {
let col_id = ColumnId::new();
key.push(col_id);
columns.push((col_id, Arc::from(sk.as_str()), InfluxColumnType::Tag));
ts.push(Field::new(col_id, sv));
}
}
Some(ts)
} else {
None
};
let mut field_set = Vec::with_capacity(line.field_set.len());
for (field_name, field_val) in line.field_set.iter() {
let col_id = ColumnId::new();
columns.push((
field_name.to_string(),
col_id,
Arc::from(field_name.as_str()),
influx_column_type_from_field_value(field_val),
));
field_set.push(Field::new(col_id, field_val));
}
// Always add time last on new table:
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
let time_col_id = ColumnId::new();
columns.push((
time_col_id,
Arc::from(TIME_COLUMN_NAME),
InfluxColumnType::Timestamp,
));
let timestamp = (time_col_id, line.timestamp);
let table_name = table_name.into();
let mut fields = Vec::with_capacity(columns.len());
for (name, influx_type) in &columns {
fields.push(FieldDefinition {
name: name.as_str().into(),
data_type: FieldDataType::from(influx_type),
});
for (id, name, influx_type) in &columns {
fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type));
}
let table = TableDefinition::new(
@ -399,14 +449,20 @@ fn validate_v3_line<'a>(
});
catalog_op = Some(table_definition_op);
let db_schema = db_schema.to_mut();
assert!(
db_schema.to_mut().tables.insert(table_id, table).is_none(),
db_schema.insert_table(table_id, Arc::new(table)).is_none(),
"attempted to overwrite existing table"
);
db_schema.to_mut().table_map.insert(table_id, table_name);
}
QualifiedLine {
table_id,
tag_set,
field_set,
timestamp,
}
};
Ok((line, catalog_op))
Ok((qualified, catalog_op))
}
/// Validate a line of line protocol against the given schema definition
@ -416,14 +472,15 @@ fn validate_v3_line<'a>(
///
/// An error will also be produced if the write, which is for the v1 data model, is targetting
/// a v3 table.
fn validate_v1_line<'a>(
fn validate_and_qualify_v1_line(
db_schema: &mut Cow<'_, DatabaseSchema>,
line_number: usize,
line: ParsedLine<'a>,
) -> Result<(ParsedLine<'a>, Option<CatalogOp>), WriteLineError> {
line: ParsedLine,
_raw_line: &str,
) -> Result<(QualifiedLine, Option<CatalogOp>), WriteLineError> {
let mut catalog_op = None;
let table_name = line.series.measurement.as_str();
if let Some(table_def) = db_schema.table_definition(table_name) {
let qualified = if let Some(table_def) = db_schema.table_definition(table_name) {
if table_def.is_v3() {
return Err(WriteLineError {
original_line: line.to_string(),
@ -433,39 +490,58 @@ fn validate_v1_line<'a>(
});
}
// This table already exists, so update with any new columns if present:
let mut columns = Vec::with_capacity(line.column_count() + 1);
if let Some(tag_set) = &line.series.tag_set {
for (tag_key, _) in tag_set {
if !table_def.column_exists(tag_key) {
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
let mut columns = ColumnTracker::with_capacity(line.column_count() + 1);
let tag_set = if let Some(tag_set) = &line.series.tag_set {
let mut ts = Vec::with_capacity(tag_set.len());
for (tag_key, tag_val) in tag_set {
if let Some(col_id) = table_def.column_name_to_id(tag_key.as_str()) {
ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string())));
} else {
let col_id = ColumnId::new();
columns.push((col_id, Arc::from(tag_key.as_str()), InfluxColumnType::Tag));
ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string())));
}
}
}
Some(ts)
} else {
None
};
let mut field_set = Vec::with_capacity(line.field_set.len());
for (field_name, field_val) in line.field_set.iter() {
// This field already exists, so check the incoming type matches existing type:
if let Some(schema_col_type) = table_def.field_type_by_name(field_name) {
if let Some((col_id, col_def)) = table_def.column_def_and_id(field_name.as_str()) {
let field_col_type = influx_column_type_from_field_value(field_val);
if field_col_type != schema_col_type {
let existing_col_type = col_def.data_type;
if field_col_type != existing_col_type {
let field_name = field_name.to_string();
return Err(WriteLineError {
original_line: line.to_string(),
line_number: line_number + 1,
error_message: format!(
"invalid field value in line protocol for field '{field_name}' on line \
{line_number}: expected type {expected}, but got {got}",
expected = schema_col_type,
got = field_col_type,
),
"invalid field value in line protocol for field '{field_name}' on line \
{line_number}: expected type {expected}, but got {got}",
expected = existing_col_type,
got = field_col_type,
),
});
}
field_set.push(Field::new(col_id, field_val));
} else {
let col_id = ColumnId::new();
columns.push((
field_name.to_string(),
col_id,
Arc::from(field_name.as_str()),
influx_column_type_from_field_value(field_val),
));
field_set.push(Field::new(col_id, field_val));
}
}
let time_col_id = table_def
.column_name_to_id(TIME_COLUMN_NAME)
.unwrap_or_default();
let timestamp = (time_col_id, line.timestamp);
// if we have new columns defined, add them to the db_schema table so that subsequent lines
// won't try to add the same definitions. Collect these additions into a catalog op, which
// will be applied to the catalog with any other ops after all lines in the write request
@ -477,20 +553,26 @@ fn validate_v1_line<'a>(
let table_id = table_def.table_id;
let mut fields = Vec::with_capacity(columns.len());
for (name, influx_type) in &columns {
fields.push(FieldDefinition {
name: name.as_str().into(),
data_type: FieldDataType::from(influx_type),
});
for (id, name, influx_type) in &columns {
fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type));
}
// unwrap is safe due to the surrounding if let condition:
let t = db_schema.to_mut().tables.get_mut(&table_id).unwrap();
t.add_columns(columns).map_err(|e| WriteLineError {
original_line: line.to_string(),
line_number: line_number + 1,
error_message: e.to_string(),
})?;
let db_schema = db_schema.to_mut();
let mut new_table_def = db_schema
.tables
.get_mut(&table_id)
// unwrap is safe due to the surrounding if let condition:
.unwrap()
.as_ref()
.clone();
new_table_def
.add_columns(columns)
.map_err(|e| WriteLineError {
original_line: line.to_string(),
line_number: line_number + 1,
error_message: e.to_string(),
})?;
db_schema.insert_table(table_id, Arc::new(new_table_def));
catalog_op = Some(CatalogOp::AddFields(FieldAdditions {
database_name,
@ -500,32 +582,51 @@ fn validate_v1_line<'a>(
field_definitions: fields,
}));
}
QualifiedLine {
table_id: table_def.table_id,
tag_set,
field_set,
timestamp,
}
} else {
let table_id = TableId::new();
// This is a new table, so build up its columns:
let mut columns = Vec::new();
if let Some(tag_set) = &line.series.tag_set {
for (tag_key, _) in tag_set {
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
let tag_set = if let Some(tag_set) = &line.series.tag_set {
let mut ts = Vec::with_capacity(tag_set.len());
for (tag_key, tag_val) in tag_set {
let col_id = ColumnId::new();
ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string())));
columns.push((col_id, Arc::from(tag_key.as_str()), InfluxColumnType::Tag));
}
}
Some(ts)
} else {
None
};
let mut field_set = Vec::with_capacity(line.field_set.len());
for (field_name, field_val) in &line.field_set {
let col_id = ColumnId::new();
columns.push((
field_name.to_string(),
col_id,
Arc::from(field_name.as_str()),
influx_column_type_from_field_value(field_val),
));
field_set.push(Field::new(col_id, field_val));
}
// Always add time last on new table:
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
let time_col_id = ColumnId::new();
columns.push((
time_col_id,
Arc::from(TIME_COLUMN_NAME),
InfluxColumnType::Timestamp,
));
let timestamp = (time_col_id, line.timestamp);
let table_name = table_name.into();
let mut fields = Vec::with_capacity(columns.len());
for (name, influx_type) in &columns {
fields.push(FieldDefinition {
name: name.as_str().into(),
data_type: FieldDataType::from(influx_type),
});
for (id, name, influx_type) in &columns {
fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type));
}
catalog_op = Some(CatalogOp::CreateTable(influxdb3_wal::TableDefinition {
table_id,
@ -536,22 +637,22 @@ fn validate_v1_line<'a>(
key: None,
}));
let table = TableDefinition::new(
table_id,
Arc::clone(&table_name),
columns,
Option::<Vec<String>>::None,
)
.unwrap();
let table = TableDefinition::new(table_id, Arc::clone(&table_name), columns, None).unwrap();
let db_schema = db_schema.to_mut();
assert!(
db_schema.to_mut().tables.insert(table_id, table).is_none(),
db_schema.insert_table(table_id, Arc::new(table)).is_none(),
"attempted to overwrite existing table"
);
db_schema.to_mut().table_map.insert(table_id, table_name);
}
QualifiedLine {
table_id,
tag_set,
field_set,
timestamp,
}
};
Ok((line, catalog_op))
Ok((qualified, catalog_op))
}
/// Result of conversion from line protocol to valid chunked data
@ -572,7 +673,7 @@ pub(crate) struct ValidatedLines {
pub(crate) catalog_updates: Option<CatalogBatch>,
}
impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
impl WriteValidator<LinesParsed> {
/// Convert a set of valid parsed `v3` lines to a [`ValidatedLines`] which will
/// be buffered and written to the WAL, if configured.
///
@ -585,22 +686,16 @@ impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
gen1_duration: Gen1Duration,
precision: Precision,
) -> ValidatedLines {
let mut table_chunks = HashMap::new();
let mut table_chunks = IndexMap::new();
let line_count = self.state.lines.len();
let mut field_count = 0;
let mut series_key_count = 0;
let mut index_count = 0;
for (line, _raw_line) in self.state.lines.into_iter() {
for line in self.state.lines.into_iter() {
field_count += line.field_set.len();
series_key_count += line
.series
.series_key
.as_ref()
.map(|sk| sk.len())
.unwrap_or(0);
index_count += line.tag_set.as_ref().map(|tags| tags.len()).unwrap_or(0);
convert_v3_parsed_line(
Arc::clone(&self.state.db_schema),
convert_qualified_line(
line,
&mut table_chunks,
ingest_time,
@ -618,7 +713,7 @@ impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
ValidatedLines {
line_count,
field_count,
index_count: series_key_count,
index_count,
errors: self.state.errors,
valid_data: write_batch,
catalog_updates: self.state.catalog_batch,
@ -626,10 +721,9 @@ impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
}
}
fn convert_v3_parsed_line(
db_schema: Arc<DatabaseSchema>,
line: v3::ParsedLine<'_>,
table_chunk_map: &mut HashMap<TableId, TableChunks>,
fn convert_qualified_line(
line: QualifiedLine,
table_chunk_map: &mut IndexMap<TableId, TableChunks>,
ingest_time: Time,
gen1_duration: Gen1Duration,
precision: Precision,
@ -638,41 +732,27 @@ fn convert_v3_parsed_line(
let mut fields = Vec::with_capacity(line.column_count() + 1);
// Add series key columns:
if let Some(series_key) = line.series.series_key {
for (sk, sv) in series_key.iter() {
fields.push(Field {
name: sk.to_string().into(),
value: sv.into(),
});
}
if let Some(tag_set) = line.tag_set {
fields.extend(tag_set);
}
// Add fields columns:
for (name, val) in line.field_set {
fields.push(Field {
name: name.to_string().into(),
value: val.into(),
});
}
fields.extend(line.field_set);
// Add time column:
// TODO: change the default time resolution to microseconds in v3
let time_value_nanos = line
.timestamp
.1
.map(|ts| apply_precision_to_timestamp(precision, ts))
.unwrap_or(ingest_time.timestamp_nanos());
fields.push(Field {
name: TIME_COLUMN_NAME.to_string().into(),
value: FieldData::Timestamp(time_value_nanos),
});
fields.push(Field::new(
line.timestamp.0,
FieldData::Timestamp(time_value_nanos),
));
// Add the row into the correct chunk in the table
let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
let table_name = line.series.measurement.as_str();
let table_id = db_schema
.table_name_to_id(table_name)
.expect("table should exist by this point");
let table_chunks = table_chunk_map.entry(table_id).or_default();
let table_chunks = table_chunk_map.entry(line.table_id).or_default();
table_chunks.push_row(
chunk_time,
Row {
@ -682,119 +762,17 @@ fn convert_v3_parsed_line(
);
}
impl<'lp> WriteValidator<LinesParsed<'lp, ParsedLine<'lp>>> {
/// Convert a set of valid parsed lines to a [`ValidatedLines`] which will
/// be buffered and written to the WAL, if configured.
///
/// This involves splitting out the writes into different batches for each chunk, which will
/// map to the `Gen1Duration`. This function should be infallible, because
/// the schema for incoming writes has been fully validated.
pub(crate) fn convert_lines_to_buffer(
self,
ingest_time: Time,
gen1_duration: Gen1Duration,
precision: Precision,
) -> ValidatedLines {
let mut table_chunks = HashMap::new();
let line_count = self.state.lines.len();
let mut field_count = 0;
let mut tag_count = 0;
for (line, _raw_line) in self.state.lines.into_iter() {
field_count += line.field_set.len();
tag_count += line.series.tag_set.as_ref().map(|t| t.len()).unwrap_or(0);
convert_v1_parsed_line(
Arc::clone(&self.state.db_schema),
line,
&mut table_chunks,
ingest_time,
gen1_duration,
precision,
);
}
let write_batch = WriteBatch::new(
self.state.catalog.db_schema.id,
Arc::clone(&self.state.catalog.db_schema.name),
table_chunks,
);
ValidatedLines {
line_count,
field_count,
index_count: tag_count,
errors: self.state.errors,
valid_data: write_batch,
catalog_updates: self.state.catalog_batch,
}
}
struct QualifiedLine {
table_id: TableId,
tag_set: Option<Vec<Field>>,
field_set: Vec<Field>,
timestamp: (ColumnId, Option<i64>),
}
fn convert_v1_parsed_line(
db_schema: Arc<DatabaseSchema>,
line: ParsedLine<'_>,
table_chunk_map: &mut HashMap<TableId, TableChunks>,
ingest_time: Time,
gen1_duration: Gen1Duration,
precision: Precision,
) {
// now that we've ensured all columns exist in the schema, construct the actual row and values
// while validating the column types match.
let mut values = Vec::with_capacity(line.column_count() + 1);
// validate tags, collecting any new ones that must be inserted, or adding the values
if let Some(tag_set) = line.series.tag_set {
for (tag_key, value) in tag_set {
let value = Field {
name: tag_key.to_string().into(),
value: FieldData::Tag(value.to_string()),
};
values.push(value);
}
impl QualifiedLine {
fn column_count(&self) -> usize {
self.tag_set.as_ref().map(|ts| ts.len()).unwrap_or(0) + self.field_set.len() + 1
}
// validate fields, collecting any new ones that must be inserted, or adding values
for (field_name, value) in line.field_set {
let field_data = match value {
FieldValue::I64(v) => FieldData::Integer(v),
FieldValue::F64(v) => FieldData::Float(v),
FieldValue::U64(v) => FieldData::UInteger(v),
FieldValue::Boolean(v) => FieldData::Boolean(v),
FieldValue::String(v) => FieldData::String(v.to_string()),
};
let value = Field {
name: field_name.to_string().into(),
value: field_data,
};
values.push(value);
}
// set the time value
let time_value_nanos = line
.timestamp
.map(|ts| apply_precision_to_timestamp(precision, ts))
.unwrap_or(ingest_time.timestamp_nanos());
let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
values.push(Field {
name: TIME_COLUMN_NAME.to_string().into(),
value: FieldData::Timestamp(time_value_nanos),
});
let table_name: Arc<str> = line.series.measurement.to_string().into();
let table_id = db_schema
.table_name_to_id(table_name)
.expect("table should exist by this point");
let table_chunks = table_chunk_map.entry(table_id).or_default();
table_chunks.push_row(
chunk_time,
Row {
time: time_value_nanos,
fields: values,
},
);
}
fn apply_precision_to_timestamp(precision: Precision, ts: i64) -> i64 {