Merge pull request #1122 from influxdata/pd-lp-to-entry

feat: implement line protocol to flatbuffers
pull/24376/head
kodiakhq[bot] 2021-04-06 22:57:17 +00:00 committed by GitHub
commit ba2ac64f80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1619 additions and 14 deletions

View File

@ -741,6 +741,14 @@ impl TryFrom<management::partition_template::Part> for TemplatePart {
} }
} }
/// ShardId maps to a nodegroup that holds the the shard.
pub type ShardId = u16;
/// Assigns a given line to a specific shard id.
pub trait Sharder {
fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId>;
}
/// ShardConfig defines rules for assigning a line/row to an individual /// ShardConfig defines rules for assigning a line/row to an individual
/// host or a group of hosts. A shard /// host or a group of hosts. A shard
/// is a logical concept, but the usage is meant to split data into /// is a logical concept, but the usage is meant to split data into

View File

@ -58,7 +58,9 @@ table Delete {
// A collection of rows in a table in column oriented representation // A collection of rows in a table in column oriented representation
table TableWriteBatch { table TableWriteBatch {
name: string; name: string;
// every column must have the same number of values in its null bitmask // every column must have the same number of bytes in its null_mask. They also must
// have the same number of rows n such that for each column c:
// c.values().len() + count_ones(null_mask) = n
columns: [Column]; columns: [Column];
} }
@ -87,6 +89,8 @@ table Column {
// array that index is located in. Here's what it might look like: // array that index is located in. Here's what it might look like:
// position: 0 8 9 24 // position: 0 8 9 24
// bit: 00100011 00111000 00000001 // bit: 00100011 00111000 00000001
// An on bit (1) indicates that the value at that position is null. If there are
// no null values in the column, the null_mask is omitted from the flatbuffers.
null_mask: [ubyte]; null_mask: [ubyte];
} }

View File

@ -1224,8 +1224,8 @@ pub mod influxdata {
args: &'args ColumnArgs<'args>, args: &'args ColumnArgs<'args>,
) -> flatbuffers::WIPOffset<Column<'bldr>> { ) -> flatbuffers::WIPOffset<Column<'bldr>> {
let mut builder = ColumnBuilder::new(_fbb); let mut builder = ColumnBuilder::new(_fbb);
if let Some(x) = args.null_bitmask { if let Some(x) = args.null_mask {
builder.add_null_bitmask(x); builder.add_null_mask(x);
} }
if let Some(x) = args.values { if let Some(x) = args.values {
builder.add_values(x); builder.add_values(x);
@ -1242,7 +1242,7 @@ pub mod influxdata {
pub const VT_LOGICAL_COLUMN_TYPE: flatbuffers::VOffsetT = 6; pub const VT_LOGICAL_COLUMN_TYPE: flatbuffers::VOffsetT = 6;
pub const VT_VALUES_TYPE: flatbuffers::VOffsetT = 8; pub const VT_VALUES_TYPE: flatbuffers::VOffsetT = 8;
pub const VT_VALUES: flatbuffers::VOffsetT = 10; pub const VT_VALUES: flatbuffers::VOffsetT = 10;
pub const VT_NULL_BITMASK: flatbuffers::VOffsetT = 12; pub const VT_NULL_MASK: flatbuffers::VOffsetT = 12;
#[inline] #[inline]
pub fn name(&self) -> Option<&'a str> { pub fn name(&self) -> Option<&'a str> {
@ -1273,10 +1273,10 @@ pub mod influxdata {
) )
} }
#[inline] #[inline]
pub fn null_bitmask(&self) -> Option<&'a [u8]> { pub fn null_mask(&self) -> Option<&'a [u8]> {
self._tab self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>( .get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(
Column::VT_NULL_BITMASK, Column::VT_NULL_MASK,
None, None,
) )
.map(|v| v.safe_slice()) .map(|v| v.safe_slice())
@ -1363,7 +1363,7 @@ pub mod influxdata {
_ => Ok(()), _ => Ok(()),
} }
})? })?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(&"null_bitmask", Self::VT_NULL_BITMASK, false)? .visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(&"null_mask", Self::VT_NULL_MASK, false)?
.finish(); .finish();
Ok(()) Ok(())
} }
@ -1373,7 +1373,7 @@ pub mod influxdata {
pub logical_column_type: LogicalColumnType, pub logical_column_type: LogicalColumnType,
pub values_type: ColumnValues, pub values_type: ColumnValues,
pub values: Option<flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>>, pub values: Option<flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>>,
pub null_bitmask: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>, pub null_mask: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
} }
impl<'a> Default for ColumnArgs<'a> { impl<'a> Default for ColumnArgs<'a> {
#[inline] #[inline]
@ -1383,7 +1383,7 @@ pub mod influxdata {
logical_column_type: LogicalColumnType::IOx, logical_column_type: LogicalColumnType::IOx,
values_type: ColumnValues::NONE, values_type: ColumnValues::NONE,
values: None, values: None,
null_bitmask: None, null_mask: None,
} }
} }
} }
@ -1427,13 +1427,13 @@ pub mod influxdata {
); );
} }
#[inline] #[inline]
pub fn add_null_bitmask( pub fn add_null_mask(
&mut self, &mut self,
null_bitmask: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>, null_mask: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>,
) { ) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>( self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(
Column::VT_NULL_BITMASK, Column::VT_NULL_MASK,
null_bitmask, null_mask,
); );
} }
#[inline] #[inline]
@ -1507,7 +1507,7 @@ pub mod influxdata {
ds.field("values", &x) ds.field("values", &x)
} }
}; };
ds.field("null_bitmask", &self.null_bitmask()); ds.field("null_mask", &self.null_mask());
ds.finish() ds.finish()
} }
} }

1592
internal_types/src/entry.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -7,6 +7,7 @@
)] )]
pub mod data; pub mod data;
pub mod entry;
pub mod once; pub mod once;
pub mod schema; pub mod schema;
pub mod selection; pub mod selection;