refactor: add dedicated type for serializaing catalog tables (#25042)
Remove reliance on data_types::ColumnType Introduce TableSnapshot for serializing table information in the catalog. Remove the columns BTree from the TableDefinition an use the schema directly. BTrees are still used to ensure column ordering when tables are created, or columns added to existing tables. The custom Deserialize impl on TableDefinition used to block duplicate column definitions in the serialized data. This preserves that bevaviour using serde_with and extends it to the other types in the catalog, namely InnerCatalog and DatabaseSchema. The serialization test for the catalog was extended to include multiple tables in a database and multiple columns spanning the range of available types in each table. Snapshot testing was introduced using the insta crate to check the serialized JSON form of the catalog, and help catch breaking changes when introducing features to the catalog. Added a test that verifies the no-duplicate key rules when deserializing the map components in the Catalogpull/25044/head
parent
faab7a0abc
commit
039dea2264
File diff suppressed because it is too large
Load Diff
|
@ -61,6 +61,7 @@ hex = "0.4.3"
|
|||
http = "0.2.9"
|
||||
humantime = "2.1.0"
|
||||
hyper = "0.14"
|
||||
insta = { version = "1.39", features = ["json"] }
|
||||
libc = { version = "0.2" }
|
||||
mockito = { version = "1.2.0", default-features = false }
|
||||
num_cpus = "1.16.0"
|
||||
|
@ -82,6 +83,7 @@ secrecy = "0.8.0"
|
|||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_urlencoded = "0.7.0"
|
||||
serde_with = "3.8.1"
|
||||
sha2 = "0.10.8"
|
||||
snap = "1.0.0"
|
||||
sqlparser = "0.41.0"
|
||||
|
|
|
@ -34,6 +34,7 @@ parking_lot.workspace = true
|
|||
parquet.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
sha2.workspace = true
|
||||
snap.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
@ -44,6 +45,7 @@ uuid.workspace = true
|
|||
[dev-dependencies]
|
||||
# Core Crates
|
||||
arrow_util.workspace = true
|
||||
insta.workspace = true
|
||||
metric.workspace = true
|
||||
pretty_assertions.workspace = true
|
||||
test_helpers.workspace = true
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
//! Implementation of the Catalog that sits entirely in memory.
|
||||
|
||||
use crate::SequenceNumber;
|
||||
use data_types::ColumnType;
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use observability_deps::tracing::info;
|
||||
use parking_lot::RwLock;
|
||||
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
|
||||
use serde::de::Visitor;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
mod serialize;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("catalog updated elsewhere")]
|
||||
|
@ -51,6 +51,21 @@ impl Default for Catalog {
|
|||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Catalog {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.inner.read().eq(&other.inner.read())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Catalog {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.inner.read().serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl Catalog {
|
||||
/// Limit for the number of Databases that InfluxDB Edge can have
|
||||
pub(crate) const NUM_DBS_LIMIT: usize = 5;
|
||||
|
@ -97,7 +112,7 @@ impl Catalog {
|
|||
}
|
||||
|
||||
for table in db.tables.values() {
|
||||
if table.columns.len() > Self::NUM_COLUMNS_PER_TABLE_LIMIT {
|
||||
if table.num_columns() > Self::NUM_COLUMNS_PER_TABLE_LIMIT {
|
||||
return Err(Error::TooManyColumns);
|
||||
}
|
||||
}
|
||||
|
@ -144,10 +159,6 @@ impl Catalog {
|
|||
self.inner.read().databases.get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> InnerCatalog {
|
||||
self.inner.into_inner()
|
||||
}
|
||||
|
||||
pub fn sequence_number(&self) -> SequenceNumber {
|
||||
self.inner.read().sequence
|
||||
}
|
||||
|
@ -159,11 +170,18 @@ impl Catalog {
|
|||
pub fn list_databases(&self) -> Vec<String> {
|
||||
self.inner.read().databases.keys().cloned().collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn db_exists(&self, db_name: &str) -> bool {
|
||||
self.inner.read().db_exists(db_name)
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
|
||||
pub struct InnerCatalog {
|
||||
/// The catalog is a map of databases with their table schemas
|
||||
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
|
||||
databases: HashMap<String, Arc<DatabaseSchema>>,
|
||||
sequence: SequenceNumber,
|
||||
}
|
||||
|
@ -186,10 +204,12 @@ impl InnerCatalog {
|
|||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||
pub struct DatabaseSchema {
|
||||
pub name: String,
|
||||
/// The database is a map of tables
|
||||
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
|
||||
pub(crate) tables: BTreeMap<String, TableDefinition>,
|
||||
}
|
||||
|
||||
|
@ -218,112 +238,74 @@ impl DatabaseSchema {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Eq, PartialEq, Clone)]
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct TableDefinition {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub schema: Schema,
|
||||
columns: BTreeMap<String, i16>,
|
||||
}
|
||||
|
||||
struct TableDefinitionVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for TableDefinitionVisitor {
|
||||
type Value = TableDefinition;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter.write_str("struct TableDefinition")
|
||||
}
|
||||
|
||||
fn visit_map<V>(self, mut map: V) -> Result<TableDefinition, V::Error>
|
||||
where
|
||||
V: serde::de::MapAccess<'de>,
|
||||
{
|
||||
let mut name = None;
|
||||
let mut columns = None;
|
||||
while let Some(key) = map.next_key::<String>()? {
|
||||
match key.as_str() {
|
||||
"name" => {
|
||||
if name.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("name"));
|
||||
}
|
||||
name = Some(map.next_value::<String>()?);
|
||||
}
|
||||
"columns" => {
|
||||
if columns.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("columns"));
|
||||
}
|
||||
columns = Some(map.next_value::<BTreeMap<String, i16>>()?);
|
||||
}
|
||||
_ => {
|
||||
let _ = map.next_value::<serde::de::IgnoredAny>()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
let name = name.ok_or_else(|| serde::de::Error::missing_field("name"))?;
|
||||
let columns = columns.ok_or_else(|| serde::de::Error::missing_field("columns"))?;
|
||||
|
||||
Ok(TableDefinition::new(name, columns))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for TableDefinition {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_map(TableDefinitionVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
impl TableDefinition {
|
||||
pub(crate) fn new(name: impl Into<String>, columns: BTreeMap<String, i16>) -> Self {
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(columns.len());
|
||||
for (name, column_type) in &columns {
|
||||
schema_builder.influx_column(
|
||||
name,
|
||||
column_type_to_influx_column_type(&ColumnType::try_from(*column_type).unwrap()),
|
||||
);
|
||||
/// Create a new [`TableDefinition`]
|
||||
///
|
||||
/// Ensures the the provided columns will be ordered before constructing the schema.
|
||||
pub(crate) fn new<N: Into<String>, CN: AsRef<str>>(
|
||||
name: N,
|
||||
columns: impl AsRef<[(CN, InfluxColumnType)]>,
|
||||
) -> Self {
|
||||
// Use a BTree to ensure that the columns are ordered:
|
||||
let mut ordered_columns = BTreeMap::new();
|
||||
for (name, column_type) in columns.as_ref() {
|
||||
ordered_columns.insert(name.as_ref(), column_type);
|
||||
}
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(columns.as_ref().len());
|
||||
let name = name.into();
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
// schema_builder.measurement(&name);
|
||||
for (name, column_type) in ordered_columns {
|
||||
schema_builder.influx_column(name, *column_type);
|
||||
}
|
||||
let schema = schema_builder.build().unwrap();
|
||||
|
||||
Self {
|
||||
name: name.into(),
|
||||
schema,
|
||||
columns,
|
||||
}
|
||||
Self { name, schema }
|
||||
}
|
||||
|
||||
/// Check if the column exists in the [`TableDefinition`]s schema
|
||||
pub(crate) fn column_exists(&self, column: &str) -> bool {
|
||||
self.columns.contains_key(column)
|
||||
self.schema.find_index_of(column).is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn add_columns(&mut self, columns: Vec<(String, i16)>) {
|
||||
for (name, column_type) in columns.into_iter() {
|
||||
self.columns.insert(name, column_type);
|
||||
/// Add the columns to this [`TableDefinition`]
|
||||
///
|
||||
/// This ensures that the resulting schema has its columns ordered
|
||||
pub(crate) fn add_columns(&mut self, columns: Vec<(String, InfluxColumnType)>) {
|
||||
// Use BTree to insert existing and new columns, and use that to generate the
|
||||
// resulting schema, to ensure column order is consistent:
|
||||
let mut cols = BTreeMap::new();
|
||||
for (col_type, field) in self.schema.iter() {
|
||||
cols.insert(field.name(), col_type);
|
||||
}
|
||||
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(self.columns.len());
|
||||
for (name, column_type) in &self.columns {
|
||||
schema_builder.influx_column(
|
||||
name,
|
||||
column_type_to_influx_column_type(&ColumnType::try_from(*column_type).unwrap()),
|
||||
);
|
||||
for (name, column_type) in columns.iter() {
|
||||
cols.insert(name, *column_type);
|
||||
}
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(columns.len());
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
// schema_builder.measurement(&self.name);
|
||||
for (name, col_type) in cols {
|
||||
schema_builder.influx_column(name, col_type);
|
||||
}
|
||||
let schema = schema_builder.build().unwrap();
|
||||
|
||||
self.schema = schema;
|
||||
}
|
||||
|
||||
pub(crate) fn index_columns(&self) -> Vec<String> {
|
||||
self.columns
|
||||
pub(crate) fn index_columns(&self) -> Vec<&str> {
|
||||
self.schema
|
||||
.iter()
|
||||
.filter_map(|(name, column_type)| {
|
||||
if *column_type == ColumnType::Tag as i16 {
|
||||
Some(name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.filter_map(|(col_type, field)| match col_type {
|
||||
InfluxColumnType::Tag => Some(field.name().as_str()),
|
||||
InfluxColumnType::Field(_) | InfluxColumnType::Timestamp => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
@ -333,52 +315,157 @@ impl TableDefinition {
|
|||
&self.schema
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn columns(&self) -> &BTreeMap<String, i16> {
|
||||
&self.columns
|
||||
pub(crate) fn num_columns(&self) -> usize {
|
||||
self.schema.len()
|
||||
}
|
||||
}
|
||||
|
||||
fn column_type_to_influx_column_type(column_type: &ColumnType) -> InfluxColumnType {
|
||||
match column_type {
|
||||
ColumnType::I64 => InfluxColumnType::Field(InfluxFieldType::Integer),
|
||||
ColumnType::U64 => InfluxColumnType::Field(InfluxFieldType::UInteger),
|
||||
ColumnType::F64 => InfluxColumnType::Field(InfluxFieldType::Float),
|
||||
ColumnType::Bool => InfluxColumnType::Field(InfluxFieldType::Boolean),
|
||||
ColumnType::String => InfluxColumnType::Field(InfluxFieldType::String),
|
||||
ColumnType::Time => InfluxColumnType::Timestamp,
|
||||
ColumnType::Tag => InfluxColumnType::Tag,
|
||||
pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnType {
|
||||
match fv {
|
||||
FieldValue::I64(_) => InfluxColumnType::Field(InfluxFieldType::Integer),
|
||||
FieldValue::U64(_) => InfluxColumnType::Field(InfluxFieldType::UInteger),
|
||||
FieldValue::F64(_) => InfluxColumnType::Field(InfluxFieldType::Float),
|
||||
FieldValue::String(_) => InfluxColumnType::Field(InfluxFieldType::String),
|
||||
FieldValue::Boolean(_) => InfluxColumnType::Field(InfluxFieldType::Boolean),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use insta::assert_json_snapshot;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn catalog_serialization() {
|
||||
let catalog = Catalog::new();
|
||||
let mut database = DatabaseSchema {
|
||||
name: "test".to_string(),
|
||||
name: "test_db".to_string(),
|
||||
tables: BTreeMap::new(),
|
||||
};
|
||||
use InfluxColumnType::*;
|
||||
use InfluxFieldType::*;
|
||||
database.tables.insert(
|
||||
"test".into(),
|
||||
"test_table_1".into(),
|
||||
TableDefinition::new(
|
||||
"test",
|
||||
BTreeMap::from([("test".to_string(), ColumnType::String as i16)]),
|
||||
"test_table_1",
|
||||
[
|
||||
("tag_1", Tag),
|
||||
("tag_2", Tag),
|
||||
("tag_3", Tag),
|
||||
("time", Timestamp),
|
||||
("string_field", Field(String)),
|
||||
("bool_field", Field(Boolean)),
|
||||
("i64_field", Field(Integer)),
|
||||
("u64_field", Field(UInteger)),
|
||||
("f64_field", Field(Float)),
|
||||
],
|
||||
),
|
||||
);
|
||||
database.tables.insert(
|
||||
"test_table_2".into(),
|
||||
TableDefinition::new(
|
||||
"test_table_2",
|
||||
[
|
||||
("tag_1", Tag),
|
||||
("tag_2", Tag),
|
||||
("tag_3", Tag),
|
||||
("time", Timestamp),
|
||||
("string_field", Field(String)),
|
||||
("bool_field", Field(Boolean)),
|
||||
("i64_field", Field(Integer)),
|
||||
("u64_field", Field(UInteger)),
|
||||
("f64_field", Field(Float)),
|
||||
],
|
||||
),
|
||||
);
|
||||
let database = Arc::new(database);
|
||||
catalog
|
||||
.replace_database(SequenceNumber::new(0), database)
|
||||
.unwrap();
|
||||
let inner = catalog.inner.read();
|
||||
|
||||
let serialized = serde_json::to_string(&*inner).unwrap();
|
||||
let deserialized: InnerCatalog = serde_json::from_str(&serialized).unwrap();
|
||||
// Perform a snapshot test to check that the JSON serialized catalog does not change in an
|
||||
// undesired way when introducing features etc.
|
||||
assert_json_snapshot!(catalog);
|
||||
|
||||
assert_eq!(*inner, deserialized);
|
||||
// Serialize/deserialize to ensure roundtrip to/from JSON
|
||||
let serialized = serde_json::to_string(&catalog).unwrap();
|
||||
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
|
||||
let deserialized = Catalog::from_inner(deserialized_inner);
|
||||
assert_eq!(catalog, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_catalog_deserialization() {
|
||||
// Duplicate databases
|
||||
{
|
||||
let json = r#"{
|
||||
"databases": {
|
||||
"db1": {
|
||||
"name": "db1",
|
||||
"tables": {}
|
||||
},
|
||||
"db1": {
|
||||
"name": "db1",
|
||||
"tables": {}
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
|
||||
assert_contains!(err.to_string(), "found duplicate key");
|
||||
}
|
||||
// Duplicate tables
|
||||
{
|
||||
let json = r#"{
|
||||
"databases": {
|
||||
"db1": {
|
||||
"name": "db1",
|
||||
"tables": {
|
||||
"tbl1": {
|
||||
"name": "tbl1",
|
||||
"cols": {}
|
||||
},
|
||||
"tbl1": {
|
||||
"name": "tbl1",
|
||||
"cols": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
|
||||
assert_contains!(err.to_string(), "found duplicate key");
|
||||
}
|
||||
// Duplicate columns
|
||||
{
|
||||
let json = r#"{
|
||||
"databases": {
|
||||
"db1": {
|
||||
"name": "db1",
|
||||
"tables": {
|
||||
"tbl1": {
|
||||
"name": "tbl1",
|
||||
"cols": {
|
||||
"col1": {
|
||||
"type": "i64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"col1": {
|
||||
"type": "u64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
|
||||
assert_contains!(err.to_string(), "found duplicate key");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -391,12 +478,15 @@ mod tests {
|
|||
"test".into(),
|
||||
TableDefinition::new(
|
||||
"test",
|
||||
BTreeMap::from([("test".to_string(), ColumnType::String as i16)]),
|
||||
[(
|
||||
"test".to_string(),
|
||||
InfluxColumnType::Field(InfluxFieldType::String),
|
||||
)],
|
||||
),
|
||||
);
|
||||
|
||||
let table = database.tables.get_mut("test").unwrap();
|
||||
table.add_columns(vec![("test2".to_string(), ColumnType::Tag as i16)]);
|
||||
table.add_columns(vec![("test2".to_string(), InfluxColumnType::Tag)]);
|
||||
let schema = table.schema();
|
||||
assert_eq!(
|
||||
schema.field(0).0,
|
||||
|
|
|
@ -0,0 +1,244 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use schema::{InfluxColumnType, SchemaBuilder};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::TableDefinition;
|
||||
|
||||
impl Serialize for TableDefinition {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let snapshot = TableSnapshot::from(self);
|
||||
snapshot.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for TableDefinition {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
TableSnapshot::<'de>::deserialize(deserializer).map(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// A snapshot of a [`TableDefinition`] used for serialization of table information from the
|
||||
/// catalog.
|
||||
///
|
||||
/// This is used over serde's `Serialize`/`Deserialize` implementations on the inner `Schema` type
|
||||
/// due to them being considered unstable. This type intends to mimic the structure of the Arrow
|
||||
/// `Schema`, and will help guard against potential breaking changes to the Arrow Schema types.
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TableSnapshot<'a> {
|
||||
name: &'a str,
|
||||
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
|
||||
cols: BTreeMap<&'a str, ColumnDefinition<'a>>,
|
||||
}
|
||||
|
||||
/// Representation of Arrow's `DataType` for table snapshots.
|
||||
///
|
||||
/// Uses `#[non_exhaustive]` with the assumption that variants will be added as we support
|
||||
/// more Arrow data types.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
#[non_exhaustive]
|
||||
enum DataType<'a> {
|
||||
Null,
|
||||
Bool,
|
||||
I8,
|
||||
I16,
|
||||
I32,
|
||||
I64,
|
||||
U8,
|
||||
U16,
|
||||
U32,
|
||||
U64,
|
||||
F16,
|
||||
F32,
|
||||
F64,
|
||||
Str,
|
||||
BigStr,
|
||||
StrView,
|
||||
Bin,
|
||||
BigBin,
|
||||
BinView,
|
||||
Dict(Box<DataType<'a>>, Box<DataType<'a>>),
|
||||
Time(TimeUnit, Option<&'a str>),
|
||||
}
|
||||
|
||||
/// Representation of Arrow's `TimeUnit` for table snapshots.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum TimeUnit {
|
||||
#[serde(rename = "s")]
|
||||
Second,
|
||||
#[serde(rename = "ms")]
|
||||
Millisecond,
|
||||
#[serde(rename = "us")]
|
||||
Microsecond,
|
||||
#[serde(rename = "ns")]
|
||||
Nanosecond,
|
||||
}
|
||||
|
||||
impl From<arrow::datatypes::TimeUnit> for TimeUnit {
|
||||
fn from(arrow_unit: arrow::datatypes::TimeUnit) -> Self {
|
||||
match arrow_unit {
|
||||
arrow::datatypes::TimeUnit::Second => Self::Second,
|
||||
arrow::datatypes::TimeUnit::Millisecond => Self::Millisecond,
|
||||
arrow::datatypes::TimeUnit::Microsecond => Self::Microsecond,
|
||||
arrow::datatypes::TimeUnit::Nanosecond => Self::Nanosecond,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to annotate columns in a Schema by their respective type in the Influx Data Model
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum InfluxType {
|
||||
Tag,
|
||||
Field,
|
||||
Time,
|
||||
}
|
||||
|
||||
impl From<InfluxColumnType> for InfluxType {
|
||||
fn from(col_type: InfluxColumnType) -> Self {
|
||||
match col_type {
|
||||
InfluxColumnType::Tag => Self::Tag,
|
||||
InfluxColumnType::Field(_) => Self::Field,
|
||||
InfluxColumnType::Timestamp => Self::Time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The inner column definition for a [`TableSnapshot`]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct ColumnDefinition<'a> {
|
||||
/// The column's data type
|
||||
#[serde(borrow)]
|
||||
r#type: DataType<'a>,
|
||||
/// The columns Influx type
|
||||
influx_type: InfluxType,
|
||||
/// Whether the column can hold NULL values
|
||||
nullable: bool,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a TableDefinition> for TableSnapshot<'a> {
|
||||
fn from(def: &'a TableDefinition) -> Self {
|
||||
let name = def.name.as_str();
|
||||
let cols = def
|
||||
.schema()
|
||||
.iter()
|
||||
.map(|(col_type, f)| {
|
||||
(
|
||||
f.name().as_str(),
|
||||
ColumnDefinition {
|
||||
r#type: f.data_type().into(),
|
||||
influx_type: col_type.into(),
|
||||
nullable: f.is_nullable(),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Self { name, cols }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a ArrowDataType> for DataType<'a> {
|
||||
fn from(arrow_type: &'a ArrowDataType) -> Self {
|
||||
match arrow_type {
|
||||
ArrowDataType::Null => Self::Null,
|
||||
ArrowDataType::Boolean => Self::Bool,
|
||||
ArrowDataType::Int8 => Self::I8,
|
||||
ArrowDataType::Int16 => Self::I16,
|
||||
ArrowDataType::Int32 => Self::I32,
|
||||
ArrowDataType::Int64 => Self::I64,
|
||||
ArrowDataType::UInt8 => Self::U8,
|
||||
ArrowDataType::UInt16 => Self::U16,
|
||||
ArrowDataType::UInt32 => Self::U32,
|
||||
ArrowDataType::UInt64 => Self::U64,
|
||||
ArrowDataType::Float16 => Self::F16,
|
||||
ArrowDataType::Float32 => Self::F32,
|
||||
ArrowDataType::Float64 => Self::F64,
|
||||
// Arrow's TimeUnit does not impl Copy, so we cheaply clone it:
|
||||
// See <https://github.com/apache/arrow-rs/issues/5839>
|
||||
ArrowDataType::Timestamp(unit, tz) => Self::Time(unit.clone().into(), tz.as_deref()),
|
||||
ArrowDataType::Date32 => todo!(),
|
||||
ArrowDataType::Date64 => todo!(),
|
||||
ArrowDataType::Time32(_) => todo!(),
|
||||
ArrowDataType::Time64(_) => todo!(),
|
||||
ArrowDataType::Duration(_) => todo!(),
|
||||
ArrowDataType::Interval(_) => todo!(),
|
||||
ArrowDataType::Binary => Self::Bin,
|
||||
ArrowDataType::FixedSizeBinary(_) => todo!(),
|
||||
ArrowDataType::LargeBinary => Self::BigBin,
|
||||
ArrowDataType::BinaryView => Self::BinView,
|
||||
ArrowDataType::Utf8 => Self::Str,
|
||||
ArrowDataType::LargeUtf8 => Self::BigStr,
|
||||
ArrowDataType::Utf8View => Self::StrView,
|
||||
ArrowDataType::List(_) => todo!(),
|
||||
ArrowDataType::ListView(_) => todo!(),
|
||||
ArrowDataType::FixedSizeList(_, _) => todo!(),
|
||||
ArrowDataType::LargeList(_) => todo!(),
|
||||
ArrowDataType::LargeListView(_) => todo!(),
|
||||
ArrowDataType::Struct(_) => todo!(),
|
||||
ArrowDataType::Union(_, _) => todo!(),
|
||||
ArrowDataType::Dictionary(key_type, val_type) => Self::Dict(
|
||||
Box::new(key_type.as_ref().into()),
|
||||
Box::new(val_type.as_ref().into()),
|
||||
),
|
||||
ArrowDataType::Decimal128(_, _) => todo!(),
|
||||
ArrowDataType::Decimal256(_, _) => todo!(),
|
||||
ArrowDataType::Map(_, _) => todo!(),
|
||||
ArrowDataType::RunEndEncoded(_, _) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<TableSnapshot<'a>> for TableDefinition {
|
||||
fn from(snap: TableSnapshot<'a>) -> Self {
|
||||
let name = snap.name.to_owned();
|
||||
let mut b = SchemaBuilder::new();
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
// b.measurement(&name);
|
||||
for (name, col) in snap.cols {
|
||||
match col.influx_type {
|
||||
InfluxType::Tag => {
|
||||
b.influx_column(name, schema::InfluxColumnType::Tag);
|
||||
}
|
||||
InfluxType::Field => {
|
||||
b.influx_field(name, col.r#type.into());
|
||||
}
|
||||
InfluxType::Time => {
|
||||
b.timestamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let schema = b.build().expect("valid schema from snapshot");
|
||||
|
||||
Self { name, schema }
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Ideally, we will remove the need for the InfluxFieldType, and be able
|
||||
// to use Arrow's DataType directly. If that happens, this conversion will need
|
||||
// to support the entirety of Arrow's DataType enum, which is why [`DataType`]
|
||||
// has been defined to mimic the Arrow type.
|
||||
//
|
||||
// See <https://github.com/influxdata/influxdb_iox/issues/11111>
|
||||
impl<'a> From<DataType<'a>> for schema::InfluxFieldType {
|
||||
fn from(data_type: DataType<'a>) -> Self {
|
||||
match data_type {
|
||||
DataType::Bool => Self::Boolean,
|
||||
DataType::I64 => Self::Integer,
|
||||
DataType::U64 => Self::UInteger,
|
||||
DataType::F64 => Self::Float,
|
||||
DataType::Str => Self::String,
|
||||
other => unimplemented!("unsupported data type in catalog {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -230,7 +230,7 @@ impl Persister for PersisterImpl {
|
|||
|
||||
async fn persist_catalog(&self, segment_id: SegmentId, catalog: Catalog) -> Result<()> {
|
||||
let catalog_path = CatalogFilePath::new(segment_id);
|
||||
let json = serde_json::to_vec_pretty(&catalog.into_inner())?;
|
||||
let json = serde_json::to_vec_pretty(&catalog)?;
|
||||
self.object_store
|
||||
.put(catalog_path.as_ref(), Bytes::from(json))
|
||||
.await?;
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
---
|
||||
source: influxdb3_write/src/catalog.rs
|
||||
expression: catalog
|
||||
---
|
||||
{
|
||||
"databases": {
|
||||
"test_db": {
|
||||
"name": "test_db",
|
||||
"tables": {
|
||||
"test_table_1": {
|
||||
"name": "test_table_1",
|
||||
"cols": {
|
||||
"bool_field": {
|
||||
"type": "bool",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"f64_field": {
|
||||
"type": "f64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"i64_field": {
|
||||
"type": "i64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"string_field": {
|
||||
"type": "str",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_1": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_2": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_3": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"time": {
|
||||
"type": {
|
||||
"time": [
|
||||
"ns",
|
||||
null
|
||||
]
|
||||
},
|
||||
"influx_type": "time",
|
||||
"nullable": false
|
||||
},
|
||||
"u64_field": {
|
||||
"type": "u64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"test_table_2": {
|
||||
"name": "test_table_2",
|
||||
"cols": {
|
||||
"bool_field": {
|
||||
"type": "bool",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"f64_field": {
|
||||
"type": "f64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"i64_field": {
|
||||
"type": "i64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"string_field": {
|
||||
"type": "str",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_1": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_2": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"tag_3": {
|
||||
"type": {
|
||||
"dict": [
|
||||
"i32",
|
||||
"str"
|
||||
]
|
||||
},
|
||||
"influx_type": "tag",
|
||||
"nullable": true
|
||||
},
|
||||
"time": {
|
||||
"type": {
|
||||
"time": [
|
||||
"ns",
|
||||
null
|
||||
]
|
||||
},
|
||||
"influx_type": "time",
|
||||
"nullable": false
|
||||
},
|
||||
"u64_field": {
|
||||
"type": "u64",
|
||||
"influx_type": "field",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"sequence": 1
|
||||
}
|
|
@ -619,7 +619,7 @@ pub(crate) mod tests {
|
|||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let expected_cpu_table = vec![
|
||||
let expected_cpu_table = [
|
||||
"+-----+----------+--------------------------------+",
|
||||
"| bar | tag1 | time |",
|
||||
"+-----+----------+--------------------------------+",
|
||||
|
@ -627,7 +627,7 @@ pub(crate) mod tests {
|
|||
"| 2.0 | cupcakes | 1970-01-01T00:00:00.000000030Z |",
|
||||
"+-----+----------+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected_cpu_table, &[cpu_table]);
|
||||
assert_batches_eq!(expected_cpu_table, &[cpu_table]);
|
||||
|
||||
let mem_table = open_segment
|
||||
.table_record_batch(
|
||||
|
@ -638,14 +638,14 @@ pub(crate) mod tests {
|
|||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let expected_mem_table = vec![
|
||||
let expected_mem_table = [
|
||||
"+-----+--------+--------------------------------+",
|
||||
"| bar | tag2 | time |",
|
||||
"+-----+--------+--------------------------------+",
|
||||
"| 2.0 | snakes | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----+--------+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected_mem_table, &[mem_table]);
|
||||
assert_batches_eq!(expected_mem_table, &[mem_table]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -695,7 +695,7 @@ pub(crate) mod tests {
|
|||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let expected_cpu_table = vec![
|
||||
let expected_cpu_table = [
|
||||
"+-----+------+------+----------+------+--------------------------------+",
|
||||
"| bar | fval | ival | tag1 | tag2 | time |",
|
||||
"+-----+------+------+----------+------+--------------------------------+",
|
||||
|
@ -706,7 +706,7 @@ pub(crate) mod tests {
|
|||
"| | 2.1 | | | | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+-----+------+------+----------+------+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected_cpu_table, &[cpu_table]);
|
||||
assert_batches_eq!(expected_cpu_table, &[cpu_table]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -7,7 +7,9 @@ mod segment_state;
|
|||
mod table_buffer;
|
||||
|
||||
use crate::cache::ParquetCache;
|
||||
use crate::catalog::{Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME};
|
||||
use crate::catalog::{
|
||||
influx_column_type_from_field_value, Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME,
|
||||
};
|
||||
use crate::chunk::ParquetChunk;
|
||||
use crate::persister::PersisterImpl;
|
||||
use crate::write_buffer::flusher::WriteBufferFlusher;
|
||||
|
@ -34,10 +36,11 @@ use object_store::ObjectMeta;
|
|||
use observability_deps::tracing::{debug, error};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use parquet_file::storage::ParquetExecInput;
|
||||
use schema::InfluxColumnType;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::HashMap;
|
||||
use std::i64;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use thiserror::Error;
|
||||
|
@ -624,13 +627,16 @@ fn validate_and_update_schema(line: &ParsedLine<'_>, schema: &mut Cow<'_, Databa
|
|||
if let Some(tagset) = &line.series.tag_set {
|
||||
for (tag_key, _) in tagset {
|
||||
if !t.column_exists(tag_key.as_str()) {
|
||||
new_cols.push((tag_key.to_string(), ColumnType::Tag as i16));
|
||||
new_cols.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
}
|
||||
for (field_name, value) in &line.field_set {
|
||||
if !t.column_exists(field_name.as_str()) {
|
||||
new_cols.push((field_name.to_string(), column_type_from_field(value) as i16));
|
||||
new_cols.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(value),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -640,17 +646,20 @@ fn validate_and_update_schema(line: &ParsedLine<'_>, schema: &mut Cow<'_, Databa
|
|||
}
|
||||
}
|
||||
None => {
|
||||
let mut columns = BTreeMap::new();
|
||||
let mut columns = Vec::new();
|
||||
if let Some(tag_set) = &line.series.tag_set {
|
||||
for (tag_key, _) in tag_set {
|
||||
columns.insert(tag_key.to_string(), ColumnType::Tag as i16);
|
||||
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
for (field_name, value) in &line.field_set {
|
||||
columns.insert(field_name.to_string(), column_type_from_field(value) as i16);
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(value),
|
||||
));
|
||||
}
|
||||
|
||||
columns.insert(TIME_COLUMN_NAME.to_string(), ColumnType::Time as i16);
|
||||
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
|
||||
|
||||
let table = TableDefinition::new(table_name, columns);
|
||||
|
||||
|
@ -891,8 +900,8 @@ mod tests {
|
|||
let db = result.schema.unwrap();
|
||||
|
||||
assert_eq!(db.tables.len(), 2);
|
||||
assert_eq!(db.tables.get("cpu").unwrap().columns().len(), 3);
|
||||
assert_eq!(db.tables.get("foo").unwrap().columns().len(), 2);
|
||||
assert_eq!(db.tables.get("cpu").unwrap().num_columns(), 3);
|
||||
assert_eq!(db.tables.get("foo").unwrap().num_columns(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -37,7 +37,7 @@ pub struct TableBuffer {
|
|||
}
|
||||
|
||||
impl TableBuffer {
|
||||
pub fn new(segment_key: PartitionKey, index_columns: &[String]) -> Self {
|
||||
pub fn new(segment_key: PartitionKey, index_columns: &[&str]) -> Self {
|
||||
Self {
|
||||
segment_key,
|
||||
timestamp_min: i64::MAX,
|
||||
|
@ -263,10 +263,10 @@ struct BufferIndex {
|
|||
}
|
||||
|
||||
impl BufferIndex {
|
||||
fn new(columns: &[String]) -> Self {
|
||||
fn new(columns: &[&str]) -> Self {
|
||||
let columns = columns
|
||||
.iter()
|
||||
.map(|c| (c.clone(), HashMap::new()))
|
||||
.map(|c| (c.to_string(), HashMap::new()))
|
||||
.collect();
|
||||
Self { columns }
|
||||
}
|
||||
|
@ -442,7 +442,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn tag_row_index() {
|
||||
let mut table_buffer = TableBuffer::new(PartitionKey::from("table"), &["tag".to_string()]);
|
||||
let mut table_buffer = TableBuffer::new(PartitionKey::from("table"), &["tag"]);
|
||||
let schema = SchemaBuilder::with_capacity(3)
|
||||
.tag("tag")
|
||||
.influx_field("value", InfluxFieldType::Integer)
|
||||
|
@ -567,7 +567,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn computed_size_of_buffer() {
|
||||
let mut table_buffer = TableBuffer::new(PartitionKey::from("table"), &["tag".to_string()]);
|
||||
let mut table_buffer = TableBuffer::new(PartitionKey::from("table"), &["tag"]);
|
||||
|
||||
let rows = vec![
|
||||
Row {
|
||||
|
|
Loading…
Reference in New Issue