fix: Move column types to their own module and encapsulate more details
parent
bbf6a5087c
commit
1f42fd8ebf
|
@ -0,0 +1,388 @@
|
|||
//! Types having to do with columns.
|
||||
|
||||
use super::{ChunkId, TableId};
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema};
|
||||
use sqlx::postgres::PgHasArrayType;
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
convert::TryFrom,
|
||||
ops::Deref,
|
||||
};
|
||||
|
||||
/// Unique ID for a `Column`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
|
||||
#[sqlx(transparent)]
|
||||
pub struct ColumnId(i64);
|
||||
|
||||
#[allow(missing_docs)]
|
||||
impl ColumnId {
|
||||
pub fn new(v: i64) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
pub fn get(&self) -> i64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl PgHasArrayType for ColumnId {
|
||||
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
|
||||
<i64 as PgHasArrayType>::array_type_info()
|
||||
}
|
||||
}
|
||||
|
||||
/// Column definitions for a table indexed by their name
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct ColumnsByName(BTreeMap<String, ColumnSchema>);
|
||||
|
||||
impl From<BTreeMap<&str, ColumnSchema>> for ColumnsByName {
|
||||
fn from(value: BTreeMap<&str, ColumnSchema>) -> Self {
|
||||
Self(
|
||||
value
|
||||
.into_iter()
|
||||
.map(|(name, column)| (name.to_owned(), column))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnsByName {
|
||||
/// Create a new instance holding the given [`Column`]s.
|
||||
pub fn new(columns: &[Column]) -> Self {
|
||||
Self(
|
||||
columns
|
||||
.iter()
|
||||
.map(|c| {
|
||||
(
|
||||
c.name.to_owned(),
|
||||
ColumnSchema {
|
||||
id: c.id,
|
||||
column_type: c.column_type,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Add the given column name and schema to this set of columns.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if a column of the same name already exists in `self`.
|
||||
pub fn add_column(
|
||||
&mut self,
|
||||
column_name: impl Into<String>,
|
||||
column_schema: impl Into<ColumnSchema>,
|
||||
) {
|
||||
let old = self.0.insert(column_name.into(), column_schema.into());
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
/// Iterate over the names and columns.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&String, &ColumnSchema)> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
/// Whether a column with this name is in the set.
|
||||
pub fn contains_column_name(&self, name: &str) -> bool {
|
||||
self.0.contains_key(name)
|
||||
}
|
||||
|
||||
/// Return number of columns in the set.
|
||||
pub fn column_count(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
/// Return the set of column names. Used in combination with a write operation's
|
||||
/// column names to determine whether a write would exceed the max allowed columns.
|
||||
pub fn names(&self) -> BTreeSet<&str> {
|
||||
self.0.keys().map(|name| name.as_str()).collect()
|
||||
}
|
||||
|
||||
/// Return an iterator of the set of column IDs.
|
||||
pub fn ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
|
||||
self.0.values().map(|c| c.id)
|
||||
}
|
||||
|
||||
/// Get a column by its name.
|
||||
pub fn get(&self, name: &str) -> Option<&ColumnSchema> {
|
||||
self.0.get(name)
|
||||
}
|
||||
|
||||
/// Create `ID->name` map for columns.
|
||||
pub fn id_map(&self) -> HashMap<ColumnId, &str> {
|
||||
self.0
|
||||
.iter()
|
||||
.map(|(name, c)| (c.id, name.as_str()))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
// ColumnsByName is a newtype so that we can implement this `TryFrom` in this crate
|
||||
impl TryFrom<&ColumnsByName> for Schema {
|
||||
type Error = schema::builder::Error;
|
||||
|
||||
fn try_from(value: &ColumnsByName) -> Result<Self, Self::Error> {
|
||||
let mut builder = SchemaBuilder::new();
|
||||
|
||||
for (column_name, column_schema) in value.iter() {
|
||||
let t = InfluxColumnType::from(column_schema.column_type);
|
||||
builder.influx_column(column_name, t);
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnsByName> for Schema {
|
||||
type Error = schema::builder::Error;
|
||||
|
||||
fn try_from(value: ColumnsByName) -> Result<Self, Self::Error> {
|
||||
Self::try_from(&value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Data object for a column
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)]
|
||||
pub struct Column {
|
||||
/// the column id
|
||||
pub id: ColumnId,
|
||||
/// the table id the column is in
|
||||
pub table_id: TableId,
|
||||
/// the name of the column, which is unique in the table
|
||||
pub name: String,
|
||||
/// the logical type of the column
|
||||
pub column_type: ColumnType,
|
||||
}
|
||||
|
||||
impl Column {
|
||||
/// returns true if the column type is a tag
|
||||
pub fn is_tag(&self) -> bool {
|
||||
self.column_type == ColumnType::Tag
|
||||
}
|
||||
|
||||
/// returns true if the column type matches the line protocol field value type
|
||||
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
|
||||
match field_value {
|
||||
FieldValue::I64(_) => self.column_type == ColumnType::I64,
|
||||
FieldValue::U64(_) => self.column_type == ColumnType::U64,
|
||||
FieldValue::F64(_) => self.column_type == ColumnType::F64,
|
||||
FieldValue::String(_) => self.column_type == ColumnType::String,
|
||||
FieldValue::Boolean(_) => self.column_type == ColumnType::Bool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The column id and its type for a column
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub struct ColumnSchema {
|
||||
/// the column id
|
||||
pub id: ColumnId,
|
||||
/// the column type
|
||||
pub column_type: ColumnType,
|
||||
}
|
||||
|
||||
impl ColumnSchema {
|
||||
/// returns true if the column is a tag
|
||||
pub fn is_tag(&self) -> bool {
|
||||
self.column_type == ColumnType::Tag
|
||||
}
|
||||
|
||||
/// returns true if the column matches the line protocol field value type
|
||||
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
|
||||
matches!(
|
||||
(field_value, self.column_type),
|
||||
(FieldValue::I64(_), ColumnType::I64)
|
||||
| (FieldValue::U64(_), ColumnType::U64)
|
||||
| (FieldValue::F64(_), ColumnType::F64)
|
||||
| (FieldValue::String(_), ColumnType::String)
|
||||
| (FieldValue::Boolean(_), ColumnType::Bool)
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns true if `mb_column` is of the same type as `self`.
|
||||
pub fn matches_type(&self, mb_column_influx_type: InfluxColumnType) -> bool {
|
||||
self.column_type == mb_column_influx_type
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Column> for ColumnSchema {
|
||||
fn from(c: &Column) -> Self {
|
||||
let Column {
|
||||
id, column_type, ..
|
||||
} = c;
|
||||
|
||||
Self {
|
||||
id: *id,
|
||||
column_type: *column_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The column data type
|
||||
#[allow(missing_docs)]
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, sqlx::Type)]
|
||||
#[repr(i16)]
|
||||
pub enum ColumnType {
|
||||
I64 = 1,
|
||||
U64 = 2,
|
||||
F64 = 3,
|
||||
Bool = 4,
|
||||
String = 5,
|
||||
Time = 6,
|
||||
Tag = 7,
|
||||
}
|
||||
|
||||
impl ColumnType {
|
||||
/// the short string description of the type
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::I64 => "i64",
|
||||
Self::U64 => "u64",
|
||||
Self::F64 => "f64",
|
||||
Self::Bool => "bool",
|
||||
Self::String => "string",
|
||||
Self::Time => "time",
|
||||
Self::Tag => "tag",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ColumnType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = self.as_str();
|
||||
|
||||
write!(f, "{s}")
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<i16> for ColumnType {
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
|
||||
fn try_from(value: i16) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
x if x == Self::I64 as i16 => Ok(Self::I64),
|
||||
x if x == Self::U64 as i16 => Ok(Self::U64),
|
||||
x if x == Self::F64 as i16 => Ok(Self::F64),
|
||||
x if x == Self::Bool as i16 => Ok(Self::Bool),
|
||||
x if x == Self::String as i16 => Ok(Self::String),
|
||||
x if x == Self::Time as i16 => Ok(Self::Time),
|
||||
x if x == Self::Tag as i16 => Ok(Self::Tag),
|
||||
_ => Err("invalid column value".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InfluxColumnType> for ColumnType {
|
||||
fn from(value: InfluxColumnType) -> Self {
|
||||
match value {
|
||||
InfluxColumnType::Tag => Self::Tag,
|
||||
InfluxColumnType::Field(InfluxFieldType::Float) => Self::F64,
|
||||
InfluxColumnType::Field(InfluxFieldType::Integer) => Self::I64,
|
||||
InfluxColumnType::Field(InfluxFieldType::UInteger) => Self::U64,
|
||||
InfluxColumnType::Field(InfluxFieldType::String) => Self::String,
|
||||
InfluxColumnType::Field(InfluxFieldType::Boolean) => Self::Bool,
|
||||
InfluxColumnType::Timestamp => Self::Time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ColumnType> for InfluxColumnType {
|
||||
fn from(value: ColumnType) -> Self {
|
||||
match value {
|
||||
ColumnType::I64 => Self::Field(InfluxFieldType::Integer),
|
||||
ColumnType::U64 => Self::Field(InfluxFieldType::UInteger),
|
||||
ColumnType::F64 => Self::Field(InfluxFieldType::Float),
|
||||
ColumnType::Bool => Self::Field(InfluxFieldType::Boolean),
|
||||
ColumnType::String => Self::Field(InfluxFieldType::String),
|
||||
ColumnType::Time => Self::Timestamp,
|
||||
ColumnType::Tag => Self::Tag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<InfluxColumnType> for ColumnType {
|
||||
fn eq(&self, got: &InfluxColumnType) -> bool {
|
||||
match self {
|
||||
Self::I64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::Integer)),
|
||||
Self::U64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::UInteger)),
|
||||
Self::F64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::Float)),
|
||||
Self::Bool => matches!(got, InfluxColumnType::Field(InfluxFieldType::Boolean)),
|
||||
Self::String => matches!(got, InfluxColumnType::Field(InfluxFieldType::String)),
|
||||
Self::Time => matches!(got, InfluxColumnType::Timestamp),
|
||||
Self::Tag => matches!(got, InfluxColumnType::Tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `ColumnType` for the passed in line protocol `FieldValue` type
|
||||
pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType {
|
||||
match field_value {
|
||||
FieldValue::I64(_) => ColumnType::I64,
|
||||
FieldValue::U64(_) => ColumnType::U64,
|
||||
FieldValue::F64(_) => ColumnType::F64,
|
||||
FieldValue::String(_) => ColumnType::String,
|
||||
FieldValue::Boolean(_) => ColumnType::Bool,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set of columns.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
|
||||
#[sqlx(transparent)]
|
||||
pub struct ColumnSet(Vec<ColumnId>);
|
||||
|
||||
impl ColumnSet {
|
||||
/// Create new column set.
|
||||
///
|
||||
/// The order of the passed columns will NOT be preserved.
|
||||
///
|
||||
/// # Panic
|
||||
/// Panics when the set of passed columns contains duplicates.
|
||||
pub fn new<I>(columns: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = ColumnId>,
|
||||
{
|
||||
let mut columns: Vec<ColumnId> = columns.into_iter().collect();
|
||||
columns.sort();
|
||||
|
||||
let len_pre_dedup = columns.len();
|
||||
columns.dedup();
|
||||
let len_post_dedup = columns.len();
|
||||
assert_eq!(len_pre_dedup, len_post_dedup, "set contains duplicates");
|
||||
|
||||
columns.shrink_to_fit();
|
||||
|
||||
Self(columns)
|
||||
}
|
||||
|
||||
/// Estimate the memory consumption of this object and its contents
|
||||
pub fn size(&self) -> usize {
|
||||
std::mem::size_of_val(self) + (std::mem::size_of::<ChunkId>() * self.0.capacity())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ColumnSet> for Vec<ColumnId> {
|
||||
fn from(set: ColumnSet) -> Self {
|
||||
set.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ColumnSet {
|
||||
type Target = [ColumnId];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.deref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[should_panic = "set contains duplicates"]
|
||||
fn test_column_set_duplicates() {
|
||||
ColumnSet::new([ColumnId::new(1), ColumnId::new(2), ColumnId::new(1)]);
|
||||
}
|
||||
}
|
|
@ -15,16 +15,13 @@
|
|||
|
||||
pub mod sequence_number_set;
|
||||
|
||||
mod columns;
|
||||
pub use columns::*;
|
||||
mod namespace_name;
|
||||
pub use namespace_name::*;
|
||||
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use observability_deps::tracing::warn;
|
||||
use schema::{
|
||||
builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema,
|
||||
TIME_COLUMN_NAME,
|
||||
};
|
||||
use sqlx::postgres::PgHasArrayType;
|
||||
use schema::{sort::SortKey, TIME_COLUMN_NAME};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
|
@ -150,27 +147,6 @@ impl std::fmt::Display for TableId {
|
|||
}
|
||||
}
|
||||
|
||||
/// Unique ID for a `Column`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
|
||||
#[sqlx(transparent)]
|
||||
pub struct ColumnId(i64);
|
||||
|
||||
#[allow(missing_docs)]
|
||||
impl ColumnId {
|
||||
pub fn new(v: i64) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
pub fn get(&self) -> i64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl PgHasArrayType for ColumnId {
|
||||
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
|
||||
<i64 as PgHasArrayType>::array_type_info()
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique ID for a `Partition`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, sqlx::FromRow)]
|
||||
#[sqlx(transparent)]
|
||||
|
@ -390,94 +366,6 @@ pub struct Table {
|
|||
pub name: String,
|
||||
}
|
||||
|
||||
/// Column definitions for a table indexed by their name
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct ColumnsByName(BTreeMap<String, ColumnSchema>);
|
||||
|
||||
impl From<BTreeMap<&str, ColumnSchema>> for ColumnsByName {
|
||||
fn from(value: BTreeMap<&str, ColumnSchema>) -> Self {
|
||||
Self(
|
||||
value
|
||||
.into_iter()
|
||||
.map(|(name, column)| (name.to_owned(), column))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnsByName {
|
||||
/// Create a new instance holding the given [`Column`]s.
|
||||
pub fn new(columns: &[Column]) -> Self {
|
||||
Self(
|
||||
columns
|
||||
.iter()
|
||||
.map(|c| {
|
||||
(
|
||||
c.name.to_owned(),
|
||||
ColumnSchema {
|
||||
id: c.id,
|
||||
column_type: c.column_type,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Iterate over the names and columns.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&String, &ColumnSchema)> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
/// Return the set of column names. Used in combination with a write operation's
|
||||
/// column names to determine whether a write would exceed the max allowed columns.
|
||||
pub fn names(&self) -> BTreeSet<&str> {
|
||||
self.0.keys().map(|name| name.as_str()).collect()
|
||||
}
|
||||
|
||||
/// Return an iterator of the set of column IDs.
|
||||
pub fn ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
|
||||
self.0.values().map(|c| c.id)
|
||||
}
|
||||
|
||||
/// Get a column by its name.
|
||||
pub fn get(&self, name: &str) -> Option<&ColumnSchema> {
|
||||
self.0.get(name)
|
||||
}
|
||||
|
||||
/// Create `ID->name` map for columns.
|
||||
pub fn id_map(&self) -> HashMap<ColumnId, &str> {
|
||||
self.0
|
||||
.iter()
|
||||
.map(|(name, c)| (c.id, name.as_str()))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
// ColumnsByName is a newtype so that we can implement this `TryFrom` in this crate
|
||||
impl TryFrom<&ColumnsByName> for Schema {
|
||||
type Error = schema::builder::Error;
|
||||
|
||||
fn try_from(value: &ColumnsByName) -> Result<Self, Self::Error> {
|
||||
let mut builder = SchemaBuilder::new();
|
||||
|
||||
for (column_name, column_schema) in value.iter() {
|
||||
let t = InfluxColumnType::from(column_schema.column_type);
|
||||
builder.influx_column(column_name, t);
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnsByName> for Schema {
|
||||
type Error = schema::builder::Error;
|
||||
|
||||
fn try_from(value: ColumnsByName) -> Result<Self, Self::Error> {
|
||||
Self::try_from(&value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Column definitions for a table
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct TableSchema {
|
||||
|
@ -518,11 +406,7 @@ impl TableSchema {
|
|||
/// This method panics if a column of the same name already exists in
|
||||
/// `self`.
|
||||
pub fn add_column(&mut self, col: &Column) {
|
||||
let old = self
|
||||
.columns
|
||||
.0
|
||||
.insert(col.name.clone(), ColumnSchema::from(col));
|
||||
assert!(old.is_none());
|
||||
self.columns.add_column(&col.name, col);
|
||||
}
|
||||
|
||||
/// Add the name and column schema to this table's schema.
|
||||
|
@ -532,8 +416,7 @@ impl TableSchema {
|
|||
/// This method panics if a column of the same name already exists in
|
||||
/// `self`.
|
||||
pub fn add_column_schema(&mut self, name: &str, column_schema: &ColumnSchema) {
|
||||
let old = self.columns.0.insert(name.into(), column_schema.to_owned());
|
||||
assert!(old.is_none());
|
||||
self.columns.add_column(name, column_schema.to_owned());
|
||||
}
|
||||
|
||||
/// Estimated Size in bytes including `self`.
|
||||
|
@ -553,7 +436,7 @@ impl TableSchema {
|
|||
|
||||
/// Whether a column with this name is in the schema.
|
||||
pub fn contains_column_name(&self, name: &str) -> bool {
|
||||
self.columns.0.contains_key(name)
|
||||
self.columns.contains_column_name(name)
|
||||
}
|
||||
|
||||
/// Return the set of column names for this table. Used in combination with a write operation's
|
||||
|
@ -564,191 +447,7 @@ impl TableSchema {
|
|||
|
||||
/// Return number of columns of the table
|
||||
pub fn column_count(&self) -> usize {
|
||||
self.columns.0.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Data object for a column
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Eq, PartialEq)]
|
||||
pub struct Column {
|
||||
/// the column id
|
||||
pub id: ColumnId,
|
||||
/// the table id the column is in
|
||||
pub table_id: TableId,
|
||||
/// the name of the column, which is unique in the table
|
||||
pub name: String,
|
||||
/// the logical type of the column
|
||||
pub column_type: ColumnType,
|
||||
}
|
||||
|
||||
impl Column {
|
||||
/// returns true if the column type is a tag
|
||||
pub fn is_tag(&self) -> bool {
|
||||
self.column_type == ColumnType::Tag
|
||||
}
|
||||
|
||||
/// returns true if the column type matches the line protocol field value type
|
||||
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
|
||||
match field_value {
|
||||
FieldValue::I64(_) => self.column_type == ColumnType::I64,
|
||||
FieldValue::U64(_) => self.column_type == ColumnType::U64,
|
||||
FieldValue::F64(_) => self.column_type == ColumnType::F64,
|
||||
FieldValue::String(_) => self.column_type == ColumnType::String,
|
||||
FieldValue::Boolean(_) => self.column_type == ColumnType::Bool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The column id and its type for a column
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub struct ColumnSchema {
|
||||
/// the column id
|
||||
pub id: ColumnId,
|
||||
/// the column type
|
||||
pub column_type: ColumnType,
|
||||
}
|
||||
|
||||
impl ColumnSchema {
|
||||
/// returns true if the column is a tag
|
||||
pub fn is_tag(&self) -> bool {
|
||||
self.column_type == ColumnType::Tag
|
||||
}
|
||||
|
||||
/// returns true if the column matches the line protocol field value type
|
||||
pub fn matches_field_type(&self, field_value: &FieldValue) -> bool {
|
||||
matches!(
|
||||
(field_value, self.column_type),
|
||||
(FieldValue::I64(_), ColumnType::I64)
|
||||
| (FieldValue::U64(_), ColumnType::U64)
|
||||
| (FieldValue::F64(_), ColumnType::F64)
|
||||
| (FieldValue::String(_), ColumnType::String)
|
||||
| (FieldValue::Boolean(_), ColumnType::Bool)
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns true if `mb_column` is of the same type as `self`.
|
||||
pub fn matches_type(&self, mb_column_influx_type: InfluxColumnType) -> bool {
|
||||
self.column_type == mb_column_influx_type
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Column> for ColumnSchema {
|
||||
fn from(c: &Column) -> Self {
|
||||
let Column {
|
||||
id, column_type, ..
|
||||
} = c;
|
||||
|
||||
Self {
|
||||
id: *id,
|
||||
column_type: *column_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The column data type
|
||||
#[allow(missing_docs)]
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, sqlx::Type)]
|
||||
#[repr(i16)]
|
||||
pub enum ColumnType {
|
||||
I64 = 1,
|
||||
U64 = 2,
|
||||
F64 = 3,
|
||||
Bool = 4,
|
||||
String = 5,
|
||||
Time = 6,
|
||||
Tag = 7,
|
||||
}
|
||||
|
||||
impl ColumnType {
|
||||
/// the short string description of the type
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::I64 => "i64",
|
||||
Self::U64 => "u64",
|
||||
Self::F64 => "f64",
|
||||
Self::Bool => "bool",
|
||||
Self::String => "string",
|
||||
Self::Time => "time",
|
||||
Self::Tag => "tag",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ColumnType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = self.as_str();
|
||||
|
||||
write!(f, "{s}")
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<i16> for ColumnType {
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
|
||||
fn try_from(value: i16) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
x if x == Self::I64 as i16 => Ok(Self::I64),
|
||||
x if x == Self::U64 as i16 => Ok(Self::U64),
|
||||
x if x == Self::F64 as i16 => Ok(Self::F64),
|
||||
x if x == Self::Bool as i16 => Ok(Self::Bool),
|
||||
x if x == Self::String as i16 => Ok(Self::String),
|
||||
x if x == Self::Time as i16 => Ok(Self::Time),
|
||||
x if x == Self::Tag as i16 => Ok(Self::Tag),
|
||||
_ => Err("invalid column value".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InfluxColumnType> for ColumnType {
|
||||
fn from(value: InfluxColumnType) -> Self {
|
||||
match value {
|
||||
InfluxColumnType::Tag => Self::Tag,
|
||||
InfluxColumnType::Field(InfluxFieldType::Float) => Self::F64,
|
||||
InfluxColumnType::Field(InfluxFieldType::Integer) => Self::I64,
|
||||
InfluxColumnType::Field(InfluxFieldType::UInteger) => Self::U64,
|
||||
InfluxColumnType::Field(InfluxFieldType::String) => Self::String,
|
||||
InfluxColumnType::Field(InfluxFieldType::Boolean) => Self::Bool,
|
||||
InfluxColumnType::Timestamp => Self::Time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ColumnType> for InfluxColumnType {
|
||||
fn from(value: ColumnType) -> Self {
|
||||
match value {
|
||||
ColumnType::I64 => Self::Field(InfluxFieldType::Integer),
|
||||
ColumnType::U64 => Self::Field(InfluxFieldType::UInteger),
|
||||
ColumnType::F64 => Self::Field(InfluxFieldType::Float),
|
||||
ColumnType::Bool => Self::Field(InfluxFieldType::Boolean),
|
||||
ColumnType::String => Self::Field(InfluxFieldType::String),
|
||||
ColumnType::Time => Self::Timestamp,
|
||||
ColumnType::Tag => Self::Tag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<InfluxColumnType> for ColumnType {
|
||||
fn eq(&self, got: &InfluxColumnType) -> bool {
|
||||
match self {
|
||||
Self::I64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::Integer)),
|
||||
Self::U64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::UInteger)),
|
||||
Self::F64 => matches!(got, InfluxColumnType::Field(InfluxFieldType::Float)),
|
||||
Self::Bool => matches!(got, InfluxColumnType::Field(InfluxFieldType::Boolean)),
|
||||
Self::String => matches!(got, InfluxColumnType::Field(InfluxFieldType::String)),
|
||||
Self::Time => matches!(got, InfluxColumnType::Timestamp),
|
||||
Self::Tag => matches!(got, InfluxColumnType::Tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `ColumnType` for the passed in line protocol `FieldValue` type
|
||||
pub fn column_type_from_field(field_value: &FieldValue) -> ColumnType {
|
||||
match field_value {
|
||||
FieldValue::I64(_) => ColumnType::I64,
|
||||
FieldValue::U64(_) => ColumnType::U64,
|
||||
FieldValue::F64(_) => ColumnType::F64,
|
||||
FieldValue::String(_) => ColumnType::String,
|
||||
FieldValue::Boolean(_) => ColumnType::Bool,
|
||||
self.columns.column_count()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -912,55 +611,6 @@ pub struct SkippedCompaction {
|
|||
pub limit_num_files_first_in_partition: i64,
|
||||
}
|
||||
|
||||
/// Set of columns.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
|
||||
#[sqlx(transparent)]
|
||||
pub struct ColumnSet(Vec<ColumnId>);
|
||||
|
||||
impl ColumnSet {
|
||||
/// Create new column set.
|
||||
///
|
||||
/// The order of the passed columns will NOT be preserved.
|
||||
///
|
||||
/// # Panic
|
||||
/// Panics when the set of passed columns contains duplicates.
|
||||
pub fn new<I>(columns: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = ColumnId>,
|
||||
{
|
||||
let mut columns: Vec<ColumnId> = columns.into_iter().collect();
|
||||
columns.sort();
|
||||
|
||||
let len_pre_dedup = columns.len();
|
||||
columns.dedup();
|
||||
let len_post_dedup = columns.len();
|
||||
assert_eq!(len_pre_dedup, len_post_dedup, "set contains duplicates");
|
||||
|
||||
columns.shrink_to_fit();
|
||||
|
||||
Self(columns)
|
||||
}
|
||||
|
||||
/// Estimate the memory consumption of this object and its contents
|
||||
pub fn size(&self) -> usize {
|
||||
std::mem::size_of_val(self) + (std::mem::size_of::<ChunkId>() * self.0.capacity())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ColumnSet> for Vec<ColumnId> {
|
||||
fn from(set: ColumnSet) -> Self {
|
||||
set.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ColumnSet {
|
||||
type Target = [ColumnId];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.deref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Data for a parquet file reference that has been inserted in the catalog.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
|
||||
pub struct ParquetFile {
|
||||
|
@ -3076,12 +2726,6 @@ mod tests {
|
|||
let _ = Timestamp::new(i64::MIN) - Timestamp::new(1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "set contains duplicates"]
|
||||
fn test_column_set_duplicates() {
|
||||
ColumnSet::new([ColumnId::new(1), ColumnId::new(2), ColumnId::new(1)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamprange_start_after_end() {
|
||||
let tr = TimestampRange::new(2, 1);
|
||||
|
|
Loading…
Reference in New Issue