style: wrap comments

Runs rustfmt with the new config.
pull/24376/head
Dom 2020-12-11 18:15:53 +00:00
parent 1446b5fcfc
commit 6f473984d0
74 changed files with 944 additions and 725 deletions

View File

@ -38,8 +38,8 @@ impl ReplicatedWrite {
flatbuffers::get_root::<wb::ReplicatedWrite<'_>>(&self.data)
}
/// Returns the Flatbuffers struct for the WriteBufferBatch in the raw bytes of the
/// payload of the ReplicatedWrite.
/// Returns the Flatbuffers struct for the WriteBufferBatch in the raw bytes
/// of the payload of the ReplicatedWrite.
pub fn write_buffer_batch(&self) -> Option<wb::WriteBufferBatch<'_>> {
match self.to_fb().payload() {
Some(d) => Some(flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&d)),
@ -191,7 +191,8 @@ pub fn split_lines_into_write_entry_partitions(
.push(line);
}
// create a WALEntry for each batch of lines going to a partition (one WALEntry per partition)
// create a WALEntry for each batch of lines going to a partition (one WALEntry
// per partition)
let entries = partition_writes
.into_iter()
.map(|(key, lines)| add_write_entry(&mut fbb, Some(&key), &lines))

View File

@ -15,61 +15,75 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// DatabaseRules contains the rules for replicating data, sending data to subscribers, and
/// querying data for a single database.
/// DatabaseRules contains the rules for replicating data, sending data to
/// subscribers, and querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct DatabaseRules {
/// Template that generates a partition key for each row inserted into the db
/// Template that generates a partition key for each row inserted into the
/// db
pub partition_template: PartitionTemplate,
/// If `store_locally` is set to `true`, this server will store writes and replicated
/// writes in a local write buffer database. This is step #4 from the diagram.
/// If `store_locally` is set to `true`, this server will store writes and
/// replicated writes in a local write buffer database. This is step #4
/// from the diagram.
pub store_locally: bool,
/// The set of host groups that data should be replicated to. Which host a
/// write goes to within a host group is determined by consistent hashing of the partition key.
/// We'd use this to create a host group per availability zone, so you might have 5 availability
/// zones with 2 hosts in each. Replication will ensure that N of those zones get a write. For
/// each zone, only a single host needs to get the write. Replication is for ensuring a write
/// exists across multiple hosts before returning success. Its purpose is to ensure write
/// durability, rather than write availability for query (this is covered by subscriptions).
/// write goes to within a host group is determined by consistent hashing of
/// the partition key. We'd use this to create a host group per
/// availability zone, so you might have 5 availability zones with 2
/// hosts in each. Replication will ensure that N of those zones get a
/// write. For each zone, only a single host needs to get the write.
/// Replication is for ensuring a write exists across multiple hosts
/// before returning success. Its purpose is to ensure write durability,
/// rather than write availability for query (this is covered by
/// subscriptions).
pub replication: Vec<HostGroupId>,
/// The minimum number of host groups to replicate a write to before success is returned. This
/// can be overridden on a per request basis. Replication will continue to write to the other
/// host groups in the background.
/// The minimum number of host groups to replicate a write to before success
/// is returned. This can be overridden on a per request basis.
/// Replication will continue to write to the other host groups in the
/// background.
pub replication_count: u8,
/// How long the replication queue can get before either rejecting writes or dropping missed
/// writes. The queue is kept in memory on a per-database basis. A queue size of zero means it
/// will only try to replicate synchronously and drop any failures.
/// How long the replication queue can get before either rejecting writes or
/// dropping missed writes. The queue is kept in memory on a
/// per-database basis. A queue size of zero means it will only try to
/// replicate synchronously and drop any failures.
pub replication_queue_max_size: usize,
/// `subscriptions` are used for query servers to get data via either push or pull as it
/// arrives. They are separate from replication as they have a different purpose. They're for
/// query servers or other clients that want to subscribe to some subset of data being written
/// in. This could either be specific partitions, ranges of partitions, tables, or rows matching
/// some predicate. This is step #3 from the diagram.
/// `subscriptions` are used for query servers to get data via either push
/// or pull as it arrives. They are separate from replication as they
/// have a different purpose. They're for query servers or other clients
/// that want to subscribe to some subset of data being written in. This
/// could either be specific partitions, ranges of partitions, tables, or
/// rows matching some predicate. This is step #3 from the diagram.
pub subscriptions: Vec<Subscription>,
/// If set to `true`, this server should answer queries from one or more of of its local write
/// buffer and any read-only partitions that it knows about. In this case, results
/// will be merged with any others from the remote goups or read only partitions.
/// If set to `true`, this server should answer queries from one or more of
/// of its local write buffer and any read-only partitions that it knows
/// about. In this case, results will be merged with any others from the
/// remote goups or read only partitions.
pub query_local: bool,
/// Set `primary_query_group` to a host group if remote servers should be issued
/// queries for this database. All hosts in the group should be queried with this server
/// acting as the coordinator that merges results together. If a specific host in the group
/// is unavailable, another host in the same position from a secondary group should be
/// queried. For example, imagine we've partitioned the data in this DB into 4 partitions and
/// we are replicating the data across 3 availability zones. We have 4 hosts in each
/// of those AZs, thus they each have 1 partition. We'd set the primary group to be the 4
/// hosts in the same AZ as this one, and the secondary groups as the hosts in the other
/// 2 AZs.
/// Set `primary_query_group` to a host group if remote servers should be
/// issued queries for this database. All hosts in the group should be
/// queried with this server acting as the coordinator that merges
/// results together. If a specific host in the group is unavailable,
/// another host in the same position from a secondary group should be
/// queried. For example, imagine we've partitioned the data in this DB into
/// 4 partitions and we are replicating the data across 3 availability
/// zones. We have 4 hosts in each of those AZs, thus they each have 1
/// partition. We'd set the primary group to be the 4 hosts in the same
/// AZ as this one, and the secondary groups as the hosts in the other 2
/// AZs.
pub primary_query_group: Option<HostGroupId>,
pub secondary_query_groups: Vec<HostGroupId>,
/// Use `read_only_partitions` when a server should answer queries for partitions that
/// come from object storage. This can be used to start up a new query server to handle
/// queries by pointing it at a collection of partitions and then telling it to also pull
/// data from the replication servers (writes that haven't been snapshotted into a partition).
/// Use `read_only_partitions` when a server should answer queries for
/// partitions that come from object storage. This can be used to start
/// up a new query server to handle queries by pointing it at a
/// collection of partitions and then telling it to also pull
/// data from the replication servers (writes that haven't been snapshotted
/// into a partition).
pub read_only_partitions: Vec<PartitionId>,
/// When set this will buffer WAL writes in memory based on the configuration.
/// When set this will buffer WAL writes in memory based on the
/// configuration.
pub wal_buffer_config: Option<WalBufferConfig>,
}
@ -83,50 +97,54 @@ impl DatabaseRules {
}
}
/// WalBufferConfig defines the configuration for buffering data from the WAL in memory. This
/// buffer is used for asynchronous replication and to collect segments before sending them to
/// object storage.
/// WalBufferConfig defines the configuration for buffering data from the WAL in
/// memory. This buffer is used for asynchronous replication and to collect
/// segments before sending them to object storage.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct WalBufferConfig {
/// The size the WAL buffer should be limited to. Once the buffer gets to this size it will
/// drop old segments to remain below this size, but still try to hold as much in memory as
/// possible while remaining below this threshold
/// The size the WAL buffer should be limited to. Once the buffer gets to
/// this size it will drop old segments to remain below this size, but
/// still try to hold as much in memory as possible while remaining
/// below this threshold
pub buffer_size: Option<u64>,
/// WAL segments become read only after crossing over this size. Which means that segments will
/// always be >= this size. When old segments are dropped from of memory, at least this much
/// space will be freed from the buffer.
/// WAL segments become read only after crossing over this size. Which means
/// that segments will always be >= this size. When old segments are
/// dropped from of memory, at least this much space will be freed from
/// the buffer.
pub segment_size: Option<u64>,
/// What should happen if a write comes in that would exceed the WAL buffer size and the
/// oldest segment that could be dropped hasn't yet been persisted to object storage. If the
/// oldest segment has been persisted, then it will be dropped from the buffer so that new writes
/// can be accepted. This option is only for defining the behavior of what happens if that segment
/// hasn't been persisted.
/// What should happen if a write comes in that would exceed the WAL buffer
/// size and the oldest segment that could be dropped hasn't yet been
/// persisted to object storage. If the oldest segment has been
/// persisted, then it will be dropped from the buffer so that new writes
/// can be accepted. This option is only for defining the behavior of what
/// happens if that segment hasn't been persisted.
pub buffer_rollover: WalBufferRollover,
}
/// WalBufferRollover defines the behavior of what should happen if a write comes in that would
/// cause the buffer to exceed its max size AND the oldest segment can't be dropped because it
/// has not yet been persisted.
/// WalBufferRollover defines the behavior of what should happen if a write
/// comes in that would cause the buffer to exceed its max size AND the oldest
/// segment can't be dropped because it has not yet been persisted.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum WalBufferRollover {
/// Drop the old segment even though it hasn't been persisted. This part of the WAl will
/// be lost on this server.
/// Drop the old segment even though it hasn't been persisted. This part of
/// the WAl will be lost on this server.
DropOldSegment,
/// Drop the incoming write and fail silently. This favors making sure that older WAL data
/// will be backed up.
/// Drop the incoming write and fail silently. This favors making sure that
/// older WAL data will be backed up.
DropIncoming,
/// Reject the incoming write and return an error. The client may retry the request, which
/// will succeed once the oldest segment has been persisted to object storage.
/// Reject the incoming write and return an error. The client may retry the
/// request, which will succeed once the oldest segment has been
/// persisted to object storage.
ReturnError,
}
/// `PartitionTemplate` is used to compute the partition key of each row that gets written. It
/// can consist of the table name, a column name and its value, a formatted time, or a string
/// column and regex captures of its value. For columns that do not appear in the input row,
/// a blank value is output.
/// `PartitionTemplate` is used to compute the partition key of each row that
/// gets written. It can consist of the table name, a column name and its value,
/// a formatted time, or a string column and regex captures of its value. For
/// columns that do not appear in the input row, a blank value is output.
///
/// The key is constructed in order of the template parts; thus ordering changes what partition
/// key is generated.
/// The key is constructed in order of the template parts; thus ordering changes
/// what partition key is generated.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct PartitionTemplate {
parts: Vec<TemplatePart>,
@ -162,7 +180,8 @@ impl PartitionTemplate {
}
}
/// `TemplatePart` specifies what part of a row should be used to compute this part of a partition key.
/// `TemplatePart` specifies what part of a row should be used to compute this
/// part of a partition key.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum TemplatePart {
Table,
@ -172,36 +191,38 @@ pub enum TemplatePart {
StrftimeColumn(StrftimeColumn),
}
/// `RegexCapture` is for pulling parts of a string column into the partition key.
/// `RegexCapture` is for pulling parts of a string column into the partition
/// key.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct RegexCapture {
column: String,
regex: String,
}
/// `StrftimeColumn` can be used to create a time based partition key off some column other than
/// the builtin `time` column.
/// `StrftimeColumn` can be used to create a time based partition key off some
/// column other than the builtin `time` column.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct StrftimeColumn {
column: String,
format: String,
}
/// `PartitionId` is the object storage identifier for a specific partition. It should be a
/// path that can be used against an object store to locate all the files and subdirectories
/// for a partition. It takes the form of `/<writer ID>/<database>/<partition key>/`.
/// `PartitionId` is the object storage identifier for a specific partition. It
/// should be a path that can be used against an object store to locate all the
/// files and subdirectories for a partition. It takes the form of `/<writer
/// ID>/<database>/<partition key>/`.
pub type PartitionId = String;
pub type WriterId = u32;
/// `Subscription` represents a group of hosts that want to receive data as it arrives.
/// The subscription has a matcher that is used to determine what data will match it, and
/// an optional queue for storing matched writes. Subscribers that recieve some subeset
/// of an individual replicated write will get a new replicated write, but with the same
/// originating writer ID and sequence number for the consuming subscriber's tracking
/// purposes.
/// `Subscription` represents a group of hosts that want to receive data as it
/// arrives. The subscription has a matcher that is used to determine what data
/// will match it, and an optional queue for storing matched writes. Subscribers
/// that recieve some subeset of an individual replicated write will get a new
/// replicated write, but with the same originating writer ID and sequence
/// number for the consuming subscriber's tracking purposes.
///
/// For pull based subscriptions, the requester will send a matcher, which the receiver
/// will execute against its in-memory WAL.
/// For pull based subscriptions, the requester will send a matcher, which the
/// receiver will execute against its in-memory WAL.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Subscription {
pub name: String,

View File

@ -1,6 +1,6 @@
//! This crate contains the data types that are shared between InfluxDB IOx servers including
//! replicated data, rules for how data is split up and queried, and what gets stored
//! in the write buffer database.
//! This crate contains the data types that are shared between InfluxDB IOx
//! servers including replicated data, rules for how data is split up and
//! queried, and what gets stored in the write buffer database.
#![deny(rust_2018_idioms)]
#![warn(

View File

@ -1,15 +1,16 @@
//! This module contains structs that describe the metadata for a partition including schema,
//! summary statistics, and file locations in storage.
//! This module contains structs that describe the metadata for a partition
//! including schema, summary statistics, and file locations in storage.
use std::fmt::{Debug, Display};
use serde::{Deserialize, Serialize};
/// Describes the schema, summary statistics for each column in each table and the location of
/// the partition in storage.
/// Describes the schema, summary statistics for each column in each table and
/// the location of the partition in storage.
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct Partition {
/// The identifier for the partition, the partition key computed from PartitionRules
/// The identifier for the partition, the partition key computed from
/// PartitionRules
pub key: String,
/// The tables in this partition
pub tables: Vec<Table>,
@ -77,7 +78,8 @@ where
}
impl Statistics<String> {
/// Function for string stats to avoid allocating if we're not updating min or max
/// Function for string stats to avoid allocating if we're not updating min
/// or max
pub fn update_string(stats: &mut Self, other: &str) {
stats.count += 1;

View File

@ -57,7 +57,8 @@ impl Tag {
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
/// Line Protocol Data Types as defined in [the InfluxData documentation][influx]
/// Line Protocol Data Types as defined in [the InfluxData
/// documentation][influx]
///
/// [influx]: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
pub enum DataType {
@ -128,7 +129,8 @@ impl Schema {
&self.measurement
}
/// Return true if `col_def` holds values for a Tag (as opposed to Field or Timestamp)
/// Return true if `col_def` holds values for a Tag (as opposed to Field or
/// Timestamp)
pub fn is_tag(&self, col_def: &ColumnDefinition) -> bool {
self.tags.contains_key(&col_def.name)
}
@ -218,7 +220,8 @@ impl SchemaBuilder {
self
}
/// Create a new schema from a list of tag names and (field_name, data_type) pairs
/// Create a new schema from a list of tag names and (field_name, data_type)
/// pairs
pub fn build(self) -> Schema {
// assign column indexes to all columns, starting at 0
let mut indexer = 0..;

View File

@ -22,7 +22,8 @@ fn main() -> Result<()> {
/// Schema used with IOx specific gRPC requests
///
/// Creates `influxdata.platform.storage.rs` and `com.github.influxdata.idpe.storage.read.rs`
/// Creates `influxdata.platform.storage.rs` and
/// `com.github.influxdata.idpe.storage.read.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let proto_files = vec![
root.join("test.proto"),

View File

@ -1,5 +1,6 @@
// This crate deliberately does not use the same linting rules as the other crates because of all
// the generated code it contains that we don't have much control over.
// This crate deliberately does not use the same linting rules as the other
// crates because of all the generated code it contains that we don't have much
// control over.
#![allow(
unused_imports,
clippy::redundant_static_lifetimes,

View File

@ -6,7 +6,8 @@ use std::{collections::BTreeMap, io};
/// Errors that occur while building `DataPoint`s
#[derive(Debug, Snafu)]
pub enum DataPointError {
/// Returned when calling `build` on a `DataPointBuilder` that has no fields.
/// Returned when calling `build` on a `DataPointBuilder` that has no
/// fields.
#[snafu(display(
"All `DataPoints` must have at least one field. Builder contains: {:?}",
data_point_builder
@ -184,22 +185,24 @@ impl From<String> for FieldValue {
/// Transform a type into valid line protocol lines
///
/// This trait is to enable the conversion of `DataPoint`s to line protocol; it is unlikely that
/// you would need to implement this trait. In the future, a `derive` crate may exist that would
/// facilitate the generation of implementations of this trait on custom types to help uphold the
/// This trait is to enable the conversion of `DataPoint`s to line protocol; it
/// is unlikely that you would need to implement this trait. In the future, a
/// `derive` crate may exist that would facilitate the generation of
/// implementations of this trait on custom types to help uphold the
/// responsibilities for escaping and producing complete lines.
pub trait WriteDataPoint {
/// Write this data point as line protocol. The implementor is responsible for
/// properly escaping the data and ensuring that complete lines
/// Write this data point as line protocol. The implementor is responsible
/// for properly escaping the data and ensuring that complete lines
/// are generated.
fn write_data_point_to<W>(&self, w: W) -> io::Result<()>
where
W: io::Write;
}
// The following are traits rather than free functions so that we can limit their implementations
// to only the data types supported for each of measurement, tag key, tag value, field key, field
// value, and timestamp. They are a private implementation detail and any custom implementations
// The following are traits rather than free functions so that we can limit
// their implementations to only the data types supported for each of
// measurement, tag key, tag value, field key, field value, and timestamp. They
// are a private implementation detail and any custom implementations
// of these traits would be generated by a future derive trait.
trait WriteMeasurement {
fn write_measurement_to<W>(&self, w: W) -> io::Result<()>

View File

@ -24,8 +24,9 @@
//! ## Quick start
//!
//! This example creates a client to an InfluxDB server running at `http://localhost:8888`, creates
//! a bucket with the name "mybucket" in the organization with name "myorg" and ID
//! "0000111100001111", builds two points, and writes the points to the bucket.
//! a bucket with the name "mybucket" in the organization with name "myorg" and
//! ID "0000111100001111", builds two points, and writes the points to the
//! bucket.
//!
//! ```
//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
@ -73,15 +74,15 @@ pub use data_point::{DataPoint, FieldValue, WriteDataPoint};
/// Errors that occur while making requests to the Influx server.
#[derive(Debug, Snafu)]
pub enum RequestError {
/// While making a request to the Influx server, the underlying `reqwest` library returned an
/// error that was not an HTTP 400 or 500.
/// While making a request to the Influx server, the underlying `reqwest`
/// library returned an error that was not an HTTP 400 or 500.
#[snafu(display("Error while processing the HTTP request: {}", source))]
ReqwestProcessing {
/// The underlying error object from `reqwest`.
source: reqwest::Error,
},
/// The underlying `reqwest` library returned an HTTP error with code 400 (meaning a client
/// error) or 500 (meaning a server error).
/// The underlying `reqwest` library returned an HTTP error with code 400
/// (meaning a client error) or 500 (meaning a server error).
#[snafu(display("HTTP request returned an error: {}, `{}`", status, text))]
Http {
/// The `StatusCode` returned from the request
@ -90,8 +91,8 @@ pub enum RequestError {
text: String,
},
/// While serializing data as JSON to send in a request, the underlying `serde_json` library
/// returned an error.
/// While serializing data as JSON to send in a request, the underlying
/// `serde_json` library returned an error.
#[snafu(display("Error while serializing to JSON: {}", source))]
Serializing {
/// The underlying error object from `serde_json`.
@ -109,8 +110,9 @@ pub struct Client {
}
impl Client {
/// Create a new client pointing to the URL specified in `protocol://server:port` format and
/// using the specified token for authorization.
/// Create a new client pointing to the URL specified in
/// `protocol://server:port` format and using the specified token for
/// authorization.
///
/// # Example
///
@ -159,7 +161,8 @@ impl Client {
Ok(())
}
/// Write a `Stream` of `DataPoint`s to the specified organization and bucket.
/// Write a `Stream` of `DataPoint`s to the specified organization and
/// bucket.
pub async fn write(
&self,
org: &str,
@ -180,8 +183,8 @@ impl Client {
Ok(self.write_line_protocol(org, bucket, body).await?)
}
/// Create a new bucket in the organization specified by the 16-digit hexadecimal `org_id` and
/// with the bucket name `bucket`.
/// Create a new bucket in the organization specified by the 16-digit
/// hexadecimal `org_id` and with the bucket name `bucket`.
pub async fn create_bucket(&self, org_id: &str, bucket: &str) -> Result<(), RequestError> {
let create_bucket_url = format!("{}/api/v2/buckets", self.url);
@ -262,10 +265,11 @@ cpu,host=server01,region=us-west usage=0.87
.build()?,
];
// If the requests made are incorrect, Mockito returns status 501 and `write` will return
// an error, which causes the test to fail here instead of when we assert on mock_server.
// The error messages that Mockito provides are much clearer for explaining why a test
// failed than just that the server returned 501, so don't use `?` here.
// If the requests made are incorrect, Mockito returns status 501 and `write`
// will return an error, which causes the test to fail here instead of
// when we assert on mock_server. The error messages that Mockito
// provides are much clearer for explaining why a test failed than just
// that the server returned 501, so don't use `?` here.
let _result = client.write(org, bucket, stream::iter(points)).await;
mock_server.assert();

View File

@ -1,5 +1,5 @@
//! This module contains a pure rust implementation of a parser for InfluxDB Line Protocol
//! https://v2.docs.influxdata.com/v2.0/reference/syntax/line-protocol/
//! This module contains a pure rust implementation of a parser for InfluxDB
//! Line Protocol https://v2.docs.influxdata.com/v2.0/reference/syntax/line-protocol/
//!
//! This implementation is intended to be compatible with the Go implementation,
//! https://github.com/influxdata/influxdb/blob/217eddc87e14a79b01d0c22994fc139f530094a2/models/points_parser.go
@ -148,8 +148,8 @@ pub struct ParsedLine<'a> {
}
impl<'a> ParsedLine<'a> {
/// Total number of columns on this line, including fields, tags, and timestamp (which is
/// always present).
/// Total number of columns on this line, including fields, tags, and
/// timestamp (which is always present).
///
/// ```
/// use influxdb_line_protocol::{ParsedLine, FieldValue};
@ -367,7 +367,6 @@ impl<'a> Display for FieldValue<'a> {
/// For example the 8 character string `Foo\\Bar` (note the double
/// `\\`) is parsed into the logical 7 character string `Foo\Bar`
/// (note the single `\`)
///
#[derive(Debug, Clone, Eq)]
pub enum EscapedStr<'a> {
SingleSlice(&'a str),
@ -685,7 +684,8 @@ fn timestamp(i: &str) -> IResult<&str, i64> {
fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
// https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
// For string field values, backslash is only used to escape itself(\) or double quotes.
// For string field values, backslash is only used to escape itself(\) or double
// quotes.
let string_data = alt((
map(tag(r#"\""#), |_| r#"""#), // escaped double quote -> double quote
map(tag(r#"\\"#), |_| r#"\"#), // escaped backslash --> single backslash
@ -707,7 +707,8 @@ fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
fn field_bool_value(i: &str) -> IResult<&str, bool> {
// https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
// "specify TRUE with t, T, true, True, or TRUE. Specify FALSE with f, F, false, False, or FALSE
// "specify TRUE with t, T, true, True, or TRUE. Specify FALSE with f, F, false,
// False, or FALSE
alt((
map(tag("true"), |_| true),
map(tag("True"), |_| true),
@ -817,7 +818,8 @@ where
result.push(escape_char);
head = after;
// The Go parser assumes that *any* unknown escaped character is valid.
// The Go parser assumes that *any* unknown escaped character is
// valid.
match head.chars().next() {
Some(c) => {
let (escaped, remaining) = head.split_at(c.len_utf8());

View File

@ -3,16 +3,16 @@ use std::{cmp, convert::TryInto, error::Error};
/// The header consists of one byte indicating the compression type.
const HEADER_LEN: usize = 1;
/// A bit packed format using 1 bit per boolean. This is the only available boolean compression
/// format at this time.
/// A bit packed format using 1 bit per boolean. This is the only available
/// boolean compression format at this time.
const BOOLEAN_COMPRESSED_BIT_PACKED: u8 = 1;
/// Encodes a slice of booleans into `dst`.
///
/// Boolean encoding uses 1 bit per value. Each compressed byte slice contains a 1 byte header
/// indicating the compression type, followed by a variable byte encoded length indicating
/// how many booleans are packed in the slice. The remaining bytes contain 1 byte for every
/// 8 boolean values encoded.
/// Boolean encoding uses 1 bit per value. Each compressed byte slice contains a
/// 1 byte header indicating the compression type, followed by a variable byte
/// encoded length indicating how many booleans are packed in the slice. The
/// remaining bytes contain 1 byte for every 8 boolean values encoded.
pub fn encode(src: &[bool], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.clear();
if src.is_empty() {
@ -38,7 +38,8 @@ pub fn encode(src: &[bool], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
if v {
dst[index] |= 128 >> (n & 7); // Set current bit on current byte.
} else {
dst[index] &= !(128 >> (n & 7)); // Clear current bit on current byte.
dst[index] &= !(128 >> (n & 7)); // Clear current bit on current
// byte.
}
n += 1;
}
@ -75,7 +76,8 @@ pub fn decode(src: &[u8], dst: &mut Vec<bool>) -> Result<(), Box<dyn Error>> {
let min = src.len() * 8;
// Shouldn't happen - TSM file was truncated/corrupted. This is what the Go code does
// Shouldn't happen - TSM file was truncated/corrupted. This is what the Go code
// does
count = cmp::min(min, count);
if dst.capacity() < count {

View File

@ -26,7 +26,8 @@ pub fn encode(src: &[f64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
return Ok(());
}
if dst.capacity() < 9 {
dst.reserve_exact(9 - dst.capacity()); // room for encoding type, block size and a value
dst.reserve_exact(9 - dst.capacity()); // room for encoding type, block
// size and a value
}
// write encoding type

View File

@ -97,8 +97,8 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
// encode_rle encodes the value v, delta and count into dst.
//
// v should be the first element of a sequence, delta the difference that each
// value in the sequence differs by, and count the number of times that the delta
// is repeated.
// value in the sequence differs by, and count the number of times that the
// delta is repeated.
fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
use super::MAX_VAR_INT_64;
dst.push(0); // save a byte for encoding type

View File

@ -1,16 +1,16 @@
use integer_encoding::VarInt;
use std::{convert::TryInto, error::Error};
/// A compressed encoding using Snappy compression. Snappy is the only available string compression
/// format at this time.
/// A compressed encoding using Snappy compression. Snappy is the only available
/// string compression format at this time.
const STRING_COMPRESSED_SNAPPY: u8 = 1;
/// The header consists of one byte indicating the compression type.
const HEADER_LEN: usize = 1;
/// Store `i32::MAX` as a `usize` for comparing with lengths in assertions
const MAX_I32: usize = i32::MAX as usize;
/// Encodes a slice of byte slices representing string data into a vector of bytes. Currently uses
/// Snappy compression.
/// Encodes a slice of byte slices representing string data into a vector of
/// bytes. Currently uses Snappy compression.
pub fn encode(src: &[&[u8]], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.clear(); // reset buffer
if src.is_empty() {
@ -69,8 +69,9 @@ pub fn encode(src: &[&[u8]], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
Ok(())
}
/// Decodes a slice of bytes representing Snappy-compressed data into a vector of vectors of bytes
/// representing string data, which may or may not be valid UTF-8.
/// Decodes a slice of bytes representing Snappy-compressed data into a vector
/// of vectors of bytes representing string data, which may or may not be valid
/// UTF-8.
pub fn decode(src: &[u8], dst: &mut Vec<Vec<u8>>) -> Result<(), Box<dyn Error>> {
if src.is_empty() {
return Ok(());

View File

@ -12,10 +12,10 @@ enum Encoding {
/// encode encodes a vector of signed integers into a slice of bytes.
///
/// To maximise compression, the provided vector should be sorted in ascending
/// order. First deltas between the integers are determined, then further encoding
/// is potentially carried out. If all the deltas are the same the block can be
/// encoded using RLE. If not, as long as the deltas are not bigger than simple8b::MAX_VALUE
/// they can be encoded using simple8b.
/// order. First deltas between the integers are determined, then further
/// encoding is potentially carried out. If all the deltas are the same the
/// block can be encoded using RLE. If not, as long as the deltas are not bigger
/// than simple8b::MAX_VALUE they can be encoded using simple8b.
pub fn encode(src: &[i64], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.clear(); // reset buffer.
if src.is_empty() {

View File

@ -75,10 +75,12 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// The format looks roughly like:
///
/// <org_id bucket_id>,\x00=<measurement>,<tag_keys_str>,\xff=<field_key_str>#!~#<field_key_str>
/// <org_id bucket_id>,\x00=<measurement>,<tag_keys_str>,\xff=<field_key_str>#!
/// ~#<field_key_str>
///
/// For example:
/// <org_id bucket_id>,\x00=http_api_request_duration_seconds,status=2XX,\xff=sum#!~#sum
/// <org_id bucket_id>,\x00=http_api_request_duration_seconds,status=2XX,\
/// xff=sum#!~#sum
///
/// measurement = "http_api_request"
/// tags = [("status", "2XX")]
@ -160,7 +162,8 @@ fn parse_tsm_key_internal(key: &[u8]) -> Result<ParsedTSMKey, DataError> {
///
/// Example: sum#!~#sum means 'sum' field key
///
/// It also turns out that the data after the delimiter does not necessairly escape the data.
/// It also turns out that the data after the delimiter does not necessairly
/// escape the data.
///
/// So for example, the following is a valid field key value (for the
/// field named "Code Cache"):
@ -639,7 +642,8 @@ mod tests {
#[test]
fn parse_tsm_key_good() {
//<org_id bucket_id>,\x00=<measurement>,<tag_keys_str>,\xff=<field_key_str>#!~#<field_key_str>
//<org_id bucket_id>,\x00=<measurement>,<tag_keys_str>,\xff=<field_key_str>#!~#
//<org_id <field_key_str>
let mut key = make_tsm_key_prefix("m", "tag1=val1,tag2=val2");
key = add_field_key(key, "f");
@ -686,7 +690,8 @@ mod tests {
#[test]
fn parse_tsm_key_two_fields() {
//<org_id bucket_id>,\x00=<measurement>,<tag_keys_str>\xff=<field-key_str>#!~#<field_key_str>\xff=<field-key_str>#!~#<field_key_str>
//<org_id bucket_id>,\x00=<measurement>,<tag_keys_str>\xff=<field-key_str>#!~#
//<org_id <field_key_str>\xff=<field-key_str>#!~#<field_key_str>
let mut key = make_tsm_key_prefix("m", "tag1=val1,tag2=val2");
key = add_field_key(key, "f");
key = add_field_key(key, "f2");
@ -703,7 +708,9 @@ mod tests {
#[test]
fn test_parse_tsm_key() {
//<org_id bucket_id>,\x00=http_api_request_duration_seconds,handler=platform,method=POST,path=/api/v2/setup,status=2XX,user_agent=Firefox,\xff=sum#!~#sum
//<org_id bucket_id>,\x00=http_api_request_duration_seconds,handler=platform,
//<org_id method=POST,path=/api/v2/setup,status=2XX,user_agent=Firefox,\xff=sum#
//<org_id !~#sum
let buf = "05C19117091A100005C19117091A10012C003D68747470\
5F6170695F726571756573745F6475726174696F6E5F73\
65636F6E64732C68616E646C65723D706C6174666F726D\
@ -732,7 +739,16 @@ mod tests {
#[test]
fn parse_tsm_key_escaped() {
//<org_id bucket_id>,\x00=query_log,env=prod01-eu-central-1,error=memory\ allocation\ limit\ reached:\ limit\ 740000000\ bytes\,\ allocated:\ 739849088\,\ wanted:\ 6946816;\ memory\ allocation\ limit\ reached:\ limit\ 740000000\ bytes\,\ allocated:\ 739849088\,\ wanted:\ 6946816,errorCode=invalid,errorType=user,host=queryd-algow-rw-76d68d5968-fzgwr,hostname=queryd-algow-rw-76d68d5968-fzgwr,nodename=ip-10-153-10-221.eu-central-1.compute.internal,orgID=0b6e852e272ffdd9,ot_trace_sampled=false,role=queryd-algow-rw,source=hackney,\xff=responseSize#!~#responseSize
//<org_id bucket_id>,\x00=query_log,env=prod01-eu-central-1,error=memory\
//<org_id allocation\ limit\ reached:\ limit\ 740000000\ bytes\,\ allocated:\
//<org_id 739849088\,\ wanted:\ 6946816;\ memory\ allocation\ limit\ reached:\
//<org_id limit\ 740000000\ bytes\,\ allocated:\ 739849088\,\ wanted:\
//<org_id 6946816,errorCode=invalid,errorType=user,
//<org_id host=queryd-algow-rw-76d68d5968-fzgwr,
//<org_id hostname=queryd-algow-rw-76d68d5968-fzgwr,nodename=ip-10-153-10-221.
//<org_id eu-central-1.compute.internal,orgID=0b6e852e272ffdd9,
//<org_id ot_trace_sampled=false,role=queryd-algow-rw,source=hackney,\
//<org_id xff=responseSize#!~#responseSize
let buf = "844910ECE80BE8BC3C0BD4C89186CA892C\
003D71756572795F6C6F672C656E763D70726F6430312D65752D63656E747261\
6C2D312C6572726F723D6D656D6F72795C20616C6C6F636174696F6E5C206C69\

View File

@ -62,7 +62,8 @@ pub struct Block {
impl Block {
/// Determines if this block overlaps the provided block.
///
/// Blocks overlap when the time-range of the data within the block can overlap.
/// Blocks overlap when the time-range of the data within the block can
/// overlap.
pub fn overlaps(&self, other: &Self) -> bool {
self.min_time <= other.max_time && other.min_time <= self.max_time
}

View File

@ -17,7 +17,6 @@ use std::iter::Peekable;
/// The main purpose of the `TSMMeasurementMapper` is to provide a
/// transformation step that allows one to convert per-series/per-field data
/// into measurement-oriented table data.
///
#[derive(Debug)]
pub struct TSMMeasurementMapper<R>
where
@ -36,7 +35,8 @@ where
}
}
/// either assign a value from a `Result` or return an error wrapped in an Option.
/// either assign a value from a `Result` or return an error wrapped in an
/// Option.
macro_rules! try_or_some {
($e:expr) => {
match $e {
@ -82,17 +82,19 @@ impl<R: Read + Seek> Iterator for TSMMeasurementMapper<R> {
}
Err(e) => return Some(Err(e.clone())),
}
self.iter.next(); // advance iterator - we got what we needed from the peek
self.iter.next(); // advance iterator - we got what we needed from
// the peek
}
Some(Ok(measurement)) // final measurement in index.
}
}
/// FieldKeyBlocks is a mapping between a set of field keys and all of the blocks
/// for those keys.
/// FieldKeyBlocks is a mapping between a set of field keys and all of the
/// blocks for those keys.
pub type FieldKeyBlocks = BTreeMap<String, Vec<Block>>;
/// A collection of related blocks, fields and tag-sets for a single measurement.
/// A collection of related blocks, fields and tag-sets for a single
/// measurement.
///
/// A `MeasurementTable` should be derived from a single TSM index (file).
/// Given a single series key, an invariant is that none of the blocks for that
@ -231,7 +233,6 @@ impl MeasurementTable {
/// files) it is possible that blocks for the same tagset and field will
/// overlap with each other. It is the callers responsibility to handle
/// merging this data when decoding those blocks.
///
pub fn merge(&mut self, other: &mut Self) -> Result<(), TSMError> {
if self.name != other.name {
return Err(TSMError {
@ -320,9 +321,9 @@ impl Display for MeasurementTable {
/// A partial collection of columns belonging to the same table.
///
/// A TableSection always contains a column of timestamps, which indicates how many
/// rows each column has. Each field column is the same length as the timestamp
/// column, but may contain values or NULL for each entry.
/// A TableSection always contains a column of timestamps, which indicates how
/// many rows each column has. Each field column is the same length as the
/// timestamp column, but may contain values or NULL for each entry.
///
/// Tag columns all have the same value in their column within this column set.
/// It is up to the caller to materialise these column vectors when required.
@ -370,17 +371,19 @@ pub enum ColumnData {
//
// For example, here we have three blocks (one block for a different field):
//
// ┌───────────┬───────────┐ ┌───────────┬───────────┐ ┌───────────┬───────────┐
// │ TS │ Temp │ │ TS │ Voltage │ │ TS │ Current │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 1 │ 10.2 │ │ 1 │ 1.23 │ │ 2 │ 0.332 │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 2 │ 11.4 │ │ 2 │ 1.24 │ │ 3 │ 0.5 │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 3 │ 10.2 │ │ 3 │ 1.26 │ │ 5 │ 0.6 │
// └───────────┼───────────┘ └───────────┼───────────┘ └───────────┼───────────┘
// │ │ │
// │ │ │
// ┌───────────┬───────────┐ ┌───────────┬───────────┐
// ┌───────────┬───────────┐ │ TS │ Temp │ │ TS │
// Voltage │ │ TS │ Current │ ├───────────┼───────────┤
// ├───────────┼───────────┤ ├───────────┼───────────┤ │ 1 │ 10.2
// │ │ 1 │ 1.23 │ │ 2 │ 0.332 │
// ├───────────┼───────────┤ ├───────────┼───────────┤
// ├───────────┼───────────┤ │ 2 │ 11.4 │ │ 2 │ 1.24
// │ │ 3 │ 0.5 │ ├───────────┼───────────┤
// ├───────────┼───────────┤ ├───────────┼───────────┤ │ 3 │ 10.2
// │ │ 3 │ 1.26 │ │ 5 │ 0.6 │
// └───────────┼───────────┘ └───────────┼───────────┘
// └───────────┼───────────┘ │ │
// │ │ │ │
// └─────────────────────────────┼────────────────────────────┘
// │
// │
@ -501,7 +504,8 @@ fn map_blocks_to_columns(
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
vs.push(None); // block has a value available
// but timestamp doesn't join
}
};
}
@ -511,7 +515,8 @@ fn map_blocks_to_columns(
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
vs.push(None); // block has a value available
// but timestamp doesn't join
}
};
}
@ -521,7 +526,8 @@ fn map_blocks_to_columns(
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
vs.push(None); // block has a value available
// but timestamp doesn't join
}
};
}
@ -532,7 +538,8 @@ fn map_blocks_to_columns(
vs.push(Some(value.clone()));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
vs.push(None); // block has a value available
// but timestamp doesn't join
}
};
}
@ -542,7 +549,8 @@ fn map_blocks_to_columns(
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
vs.push(None); // block has a value available
// but timestamp doesn't join
}
};
}

View File

@ -44,7 +44,6 @@ use std::u64;
/// }
/// }
/// ```
///
#[derive(Debug)]
pub struct TSMIndexReader<R>
where
@ -266,8 +265,8 @@ impl BlockDecoder for MockBlockDecoder {
}
}
/// `BlockData` describes the various types of block data that can be held within
/// a TSM file.
/// `BlockData` describes the various types of block data that can be held
/// within a TSM file.
#[derive(Debug, Clone, PartialEq)]
pub enum BlockData {
Float {
@ -457,9 +456,9 @@ impl BlockData {
/// Merges multiple blocks of data together.
///
/// For values within the block that have identical timestamps, `merge`
/// overwrites previous values. Therefore, in order to have "last write wins"
/// semantics it is important that the provided vector of blocks is ordered
/// by the wall-clock time the blocks were created.
/// overwrites previous values. Therefore, in order to have "last write
/// wins" semantics it is important that the provided vector of blocks
/// is ordered by the wall-clock time the blocks were created.
pub fn merge(mut blocks: Vec<Self>) -> Self {
if blocks.is_empty() {
panic!("merge called with zero blocks");
@ -552,7 +551,6 @@ impl ValuePair {
/// `TSMBlockReader` allows you to read and decode TSM blocks from within a TSM
/// file.
///
#[derive(Debug)]
pub struct TSMBlockReader<R>
where
@ -746,7 +744,9 @@ mod tests {
assert_eq!(got_blocks, 2159); // 2,159 blocks in the file
assert_eq!(got_min_time, 1_590_585_404_546_128_000); // earliest time is 2020-05-27T13:16:44.546128Z
assert_eq!(got_max_time, 1_590_597_378_379_824_000); // latest time is 2020-05-27T16:36:18.379824Z
assert_eq!(got_max_time, 1_590_597_378_379_824_000); // latest time is
// 2020-05-27T16:
// 36:18.379824Z
}
#[test]

View File

@ -91,7 +91,8 @@ pub enum Error {
CouldNotFindColumn,
}
/// Handles buffering `ParsedLine` objects and deducing a schema from that sample
/// Handles buffering `ParsedLine` objects and deducing a schema from that
/// sample
#[derive(Debug)]
struct MeasurementSampler<'a> {
settings: ConversionSettings,
@ -116,8 +117,8 @@ struct MeasurementWriter<'a> {
}
/// Tracks the conversation state for each measurement: either in
/// "UnknownSchema" mode when the schema is still unknown or "KnownSchema" mode once
/// the schema is known.
/// "UnknownSchema" mode when the schema is still unknown or "KnownSchema" mode
/// once the schema is known.
#[derive(Debug)]
enum MeasurementConverter<'a> {
UnknownSchema(MeasurementSampler<'a>),
@ -339,7 +340,8 @@ impl<'a> MeasurementWriter<'a> {
self.write_buffer.len() >= self.settings.measurement_write_buffer_size
}
/// Buffers a `ParsedLine`s (which are row-based) in preparation for column packing and writing
/// Buffers a `ParsedLine`s (which are row-based) in preparation for column
/// packing and writing
pub fn buffer_line(&mut self, line: ParsedLine<'a>) -> Result<(), Error> {
if self.buffer_full() {
self.flush_buffer()?;
@ -531,8 +533,9 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
// }
// }
// fn write_arrow_file(parquet_schema: Schema, packers: Vec<Packers>) -> Result<(), Error> {
// let file = File::create("/tmp/http_api_requests_total.arrow").unwrap();
// fn write_arrow_file(parquet_schema: Schema, packers: Vec<Packers>) ->
// Result<(), Error> { let file =
// File::create("/tmp/http_api_requests_total.arrow").unwrap();
// let mut record_batch_fields: Vec<datatypes::Field> = vec![];
// // no default() on Field...
@ -570,29 +573,29 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
// }
// loop {
// let mut chunked_packers: Vec<Packers> = Vec::with_capacity(packers.len());
// for chunker in &mut packer_chunkers {
// match chunker {
// let mut chunked_packers: Vec<Packers> =
// Vec::with_capacity(packers.len()); for chunker in &mut
// packer_chunkers { match chunker {
// PackerChunker::Float(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Float(Packer::from(chunk)));
// }
// }
//
// chunked_packers.push(Packers::Float(Packer::from(chunk)));
// } }
// PackerChunker::Integer(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Integer(Packer::from(chunk)));
// }
// }
//
// chunked_packers.push(Packers::Integer(Packer::from(chunk)));
// } }
// PackerChunker::String(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::String(Packer::from(chunk)));
// }
// }
//
// chunked_packers.push(Packers::String(Packer::from(chunk)));
// } }
// PackerChunker::Boolean(c) => {
// if let Some(chunk) = c.next() {
// chunked_packers.push(Packers::Boolean(Packer::from(chunk)));
// }
// }
//
// chunked_packers.push(Packers::Boolean(Packer::from(chunk)));
// } }
// }
// }
@ -610,8 +613,8 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
// chunked_packers.len(),
// chunked_packers[0].num_rows()
// );
// write_arrow_batch(&mut writer, Arc::new(schema.clone()), chunked_packers);
// }
// write_arrow_batch(&mut writer, Arc::new(schema.clone()),
// chunked_packers); }
// writer.finish().unwrap();
// Ok(())
@ -627,19 +630,21 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
// for packer in packers {
// match packer {
// Packers::Float(p) => {
// record_batch_arrays.push(Arc::new(array::Float64Array::from(p.values().to_vec())));
// }
//
// record_batch_arrays.push(Arc::new(array::Float64Array::from(p.values().
// to_vec()))); }
// Packers::Integer(p) => {
// record_batch_arrays.push(Arc::new(array::Int64Array::from(p.values().to_vec())));
// }
//
// record_batch_arrays.push(Arc::new(array::Int64Array::from(p.values().
// to_vec()))); }
// Packers::String(p) => {
// let mut builder = array::StringBuilder::new(p.num_rows());
// for v in p.values() {
// match v {
// Some(v) => {
// builder.append_value(v.as_utf8().unwrap()).unwrap();
// }
// None => {
//
// builder.append_value(v.as_utf8().unwrap()).unwrap();
// } None => {
// builder.append_null().unwrap();
// }
// }
@ -654,8 +659,8 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
// }
// }
// let record_batch = record_batch::RecordBatch::try_new(schema, record_batch_arrays).unwrap();
// w.write(&record_batch).unwrap();
// let record_batch = record_batch::RecordBatch::try_new(schema,
// record_batch_arrays).unwrap(); w.write(&record_batch).unwrap();
// }
/// Converts one or more TSM files into the packers internal columnar
@ -845,15 +850,15 @@ impl TSMFileConverter {
// The processing function we supply to `process` does the following:
//
// - Append the timestamp column to the packer timestamp column
// - Materialise the same tag value for any tag key columns where the
// emitted section has a none-null value for that column.
// - Materialise NULL values for any tag key columns that we don't have
// data for in the emitted section.
// - Append the field columns to the packer field columns. The emitted
// section will already have fully materialised the data for these
// columns, including any NULL entries.
// - Materialise NULL values for any field columns that the emitted
// section does not have any data for.
// - Materialise the same tag value for any tag key columns where the emitted
// section has a none-null value for that column.
// - Materialise NULL values for any tag key columns that we don't have data
// for in the emitted section.
// - Append the field columns to the packer field columns. The emitted section
// will already have fully materialised the data for these columns, including
// any NULL entries.
// - Materialise NULL values for any field columns that the emitted section
// does not have any data for.
//
m.process(
&mut block_reader,
@ -1405,7 +1410,8 @@ mod tests {
#[test]
fn measurement_sampler_deduce_schema_multi_line_field_changed() {
// given two lines of protocol data that have apparently different data types for the field:
// given two lines of protocol data that have apparently different data types
// for the field:
let mut sampler = make_sampler_from_data(
r#"
cpu,host=A usage_system=64i 1590488773254420000
@ -1418,7 +1424,8 @@ mod tests {
.expect("Successful schema conversion");
assert_eq!(schema.measurement(), "cpu");
// Then the first field type appears in the resulting schema (TBD is this what we want??)
// Then the first field type appears in the resulting schema (TBD is this what
// we want??)
let cols = schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 3);
@ -1783,12 +1790,14 @@ mod tests {
// | b | NULL | NULL | NULL | NULL | 1000 | 3000 |
// | b | NULL | NULL | NULL | NULL | 2000 | 4000 |
// | b | NULL | NULL | NULL | NULL | 3000 | 5000 |
// | NULL | east | NULL | 1.2 | 10.2 | NULL | 0000 | <-- notice series joined on ts column
// | NULL | east | NULL | 1.2 | 10.2 | NULL | 1000 | <-- notice series joined on ts column
// | NULL | east | NULL | 1.4 | 10.4 | NULL | 2000 | <-- notice series joined on ts column
// | NULL | west | a | 100.2 | NULL | NULL | 2000 |
// | NULL | west | a | 99.5 | NULL | NULL | 3000 |
// | NULL | west | a | 100.3 | NULL | NULL | 4000 |
// | NULL | east | NULL | 1.2 | 10.2 | NULL | 0000 | <-- notice
// series joined on ts column | NULL | east | NULL | 1.2 | 10.2
// | NULL | 1000 | <-- notice series joined on ts column
// | NULL | east | NULL | 1.4 | 10.4 | NULL | 2000 | <-- notice
// series joined on ts column | NULL | west | a | 100.2 | NULL
// | NULL | 2000 | | NULL | west | a | 99.5 | NULL |
// NULL | 3000 | | NULL | west | a | 100.3 | NULL | NULL |
// 4000 |
let mut table = MeasurementTable::new("cpu".to_string(), 0);
// cpu region=east temp=<all the block data for this key>

View File

@ -6,7 +6,8 @@
clippy::use_self
)]
// Export the parts of the parquet crate that are needed to interact with code in this crate
// Export the parts of the parquet crate that are needed to interact with code
// in this crate
pub use arrow_deps::parquet::{
errors::ParquetError,
file::reader::{ChunkReader, Length},

View File

@ -385,7 +385,8 @@ fn create_writer_props(
// TODO: Maybe tweak more of these settings for maximum performance.
// start off with GZIP for maximum compression ratio (at expense of CPU performance...)
// start off with GZIP for maximum compression ratio (at expense of CPU
// performance...)
builder = builder.set_compression(Compression::GZIP);
// Setup encoding as defined in
@ -446,8 +447,8 @@ fn create_writer_props(
};
}
// Even though the 'set_statistics_enabled()' method is called here, the resulting
// parquet file does not appear to have statistics enabled.
// Even though the 'set_statistics_enabled()' method is called here, the
// resulting parquet file does not appear to have statistics enabled.
//
// This is due to the fact that the underlying rust parquet
// library does not support statistics generation at this time.
@ -462,7 +463,8 @@ fn create_writer_props(
mod tests {
use super::*;
// Collapses multiple spaces into a single space, and removes trailing whitespace
// Collapses multiple spaces into a single space, and removes trailing
// whitespace
fn normalize_spaces(s: &str) -> String {
// previous non space, if any
let mut prev: Option<char> = None;

View File

@ -11,8 +11,8 @@
// datasource::TableProvider,
// execution::{
// context::ExecutionContextState,
// physical_plan::{common::RecordBatchIterator, ExecutionPlan, Partition},
// },
// physical_plan::{common::RecordBatchIterator, ExecutionPlan,
// Partition}, },
// logicalplan::{make_logical_plan_node, Expr, LogicalPlan},
// lp::LogicalPlanNode,
// optimizer::utils,
@ -42,8 +42,8 @@
// self.store.schema()
// }
// /// Perform a scan of a table and return a sequence of iterators over the data (one
// /// iterator per partition)
// /// Perform a scan of a table and return a sequence of iterators over the
// data (one /// iterator per partition)
// fn scan(
// &self,
// _projection: &Option<Vec<usize>>,
@ -114,8 +114,8 @@
// ///
// /// The following plan would be produced
// /// Projection: #env, #method, #host, #counter, #time
// /// SegmentScan: measurement projection=None predicate=: #time GtEq Int64(1590036110000000)
// ///
// /// SegmentScan: measurement projection=None predicate=: #time GtEq
// Int64(1590036110000000) ///
// fn rewrite_to_segment_scan(&self, plan: &LogicalPlan) -> LogicalPlan {
// if let LogicalPlan::Filter { predicate, input } = plan {
// // see if the input is a TableScan
@ -133,13 +133,13 @@
// .map(|input| self.rewrite_to_segment_scan(input))
// .collect();
// return utils::from_plan(plan, &utils::expressions(plan), &optimized_inputs)
// .expect("Created plan");
// return utils::from_plan(plan, &utils::expressions(plan),
// &optimized_inputs) .expect("Created plan");
// }
// }
// /// LogicalPlan node that serves as a scan of the segment store with optional predicates
// struct SegmentScan {
// /// LogicalPlan node that serves as a scan of the segment store with optional
// predicates struct SegmentScan {
// /// The underlying Store
// store: Arc<Store>,
@ -172,17 +172,17 @@
// self.schema.as_ref()
// }
// /// returns all expressions (non-recursively) in the current logical plan node.
// fn expressions(&self) -> Vec<Expr> {
// /// returns all expressions (non-recursively) in the current logical plan
// node. fn expressions(&self) -> Vec<Expr> {
// // The predicate expression gets absorbed by this node As
// // there are no inputs, there are no exprs that operate on
// // inputs
// Vec::new()
// }
// /// Write a single line human readable string to `f` for use in explain plan
// fn format_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// write!(
// /// Write a single line human readable string to `f` for use in explain
// plan fn format_for_explain(&self, f: &mut fmt::Formatter<'_>) ->
// fmt::Result { write!(
// f,
// "SegmentScan: {:?} predicate {:?}",
// self.store.as_ref() as *const Store,
@ -196,11 +196,11 @@
// /// clone that creates a node with a known Size (i.e. Box)
// //
// fn dyn_clone(&self) -> Box<dyn LogicalPlanNode> {
// Box::new(SegmentScan::new(self.store.clone(), self.predicate.clone()))
// }
// Box::new(SegmentScan::new(self.store.clone(),
// self.predicate.clone())) }
// /// Create a clone of this LogicalPlanNode with inputs and expressions replaced.
// ///
// /// Create a clone of this LogicalPlanNode with inputs and expressions
// replaced. ///
// /// Note that exprs and inputs are in the same order as the result
// /// of self.inputs and self.exprs.
// ///
@ -212,8 +212,8 @@
// ) -> Box<dyn LogicalPlanNode> {
// assert_eq!(exprs.len(), 0, "no exprs expected");
// assert_eq!(inputs.len(), 0, "no inputs expected");
// Box::new(SegmentScan::new(self.store.clone(), self.predicate.clone()))
// }
// Box::new(SegmentScan::new(self.store.clone(),
// self.predicate.clone())) }
// /// Create the corresponding physical scheplan for this node
// fn create_physical_plan(
@ -229,8 +229,9 @@
// // hard code it here instead
// assert_eq!(
// format!("{:?}", self.predicate),
// "CAST(#time AS Int64) GtEq Int64(1590036110000000) And CAST(#time AS Int64) Lt Int64(1590040770000000) And #env Eq Utf8(\"prod01-eu-central-1\")"
// );
// "CAST(#time AS Int64) GtEq Int64(1590036110000000) And CAST(#time
// AS Int64) Lt Int64(1590040770000000) And #env Eq
// Utf8(\"prod01-eu-central-1\")" );
// let time_range = (1590036110000000, 1590040770000000);
// let string_predicate = StringPredicate {
@ -263,8 +264,8 @@
// }
// impl SegmentScanExec {
// fn new(store: Arc<Store>, time_range: (i64, i64), string_predicate: StringPredicate) -> Self {
// SegmentScanExec {
// fn new(store: Arc<Store>, time_range: (i64, i64), string_predicate:
// StringPredicate) -> Self { SegmentScanExec {
// store,
// time_range,
// string_predicate,
@ -277,8 +278,8 @@
// self.store.schema()
// }
// fn partitions(&self) -> arrow_deps::datafusion::error::Result<Vec<Arc<dyn Partitioning>>> {
// let store = self.store.clone();
// fn partitions(&self) -> arrow_deps::datafusion::error::Result<Vec<Arc<dyn
// Partitioning>>> { let store = self.store.clone();
// Ok(vec![Arc::new(SegmentPartition {
// store,
// time_range: self.time_range,
@ -297,8 +298,8 @@
// impl Partition for SegmentPartition {
// fn execute(
// &self,
// ) -> arrow_deps::datafusion::error::Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>
// {
// ) -> arrow_deps::datafusion::error::Result<Arc<Mutex<dyn
// RecordBatchReader + Send + Sync>>> {
// let combined_results: Vec<Arc<RecordBatch>> = vec![];
// let segments = self.store.segments();

View File

@ -162,7 +162,8 @@ fn convert_record_batch(rb: RecordBatch, segment: &mut Segment) -> Result<(), Er
// TODO(edd): figure out how to get ownership here without
// cloning
// let arr: array::Float64Array = arrow::array::PrimitiveArray::from(column.data());
// let arr: array::Float64Array =
// arrow::array::PrimitiveArray::from(column.data());
// let column = Column::from(arr);
// segment.add_column(rb.schema().field(i).name(), column);
}
@ -176,7 +177,8 @@ fn convert_record_batch(rb: RecordBatch, segment: &mut Segment) -> Result<(), Er
// TODO(edd): figure out how to get ownership here without
// cloning
// let arr: array::Int64Array = arrow::array::PrimitiveArray::from(column.data());
// let arr: array::Int64Array =
// arrow::array::PrimitiveArray::from(column.data());
// let column = Column::from(arr);
// segment.add_column(rb.schema().field(i).name(), column);
}
@ -628,7 +630,8 @@ fn time_tag_keys_with_pred(store: &Store) {
}
//
// SHOW TAG VALUES ON "host", "method" WHERE time >= x and time < y AND "env" = 'prod01-us-west-1'
// SHOW TAG VALUES ON "host", "method" WHERE time >= x and time < y AND "env" =
// 'prod01-us-west-1'
fn time_tag_values_with_pred(store: &Store) {
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);

View File

@ -240,29 +240,30 @@ pub enum AggregateType {
// match self {
// Self::Count(self_count) => match _rhs {
// Some(other_scalar) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => panic!("cannot add floating point value to a count"),
// Scalar::Integer(v) => Self::Count(self_count + *v as u64),
// Scalar::Unsigned32(v) => Self::Count(self_count + *v as u64),
// },
// None => self,
// Scalar::String(_) => panic!("todo - remove String
// scalar"), Scalar::Float(_) => panic!("cannot add floating
// point value to a count"), Scalar::Integer(v) =>
// Self::Count(self_count + *v as u64),
// Scalar::Unsigned32(v) => Self::Count(self_count + *v as u64),
// }, None => self,
// },
// // SUM ignores NULL values. Initially an aggregate sum is `None`, but
// // as soon as a non-null value is shown then it becomes `Some`.
// Self::Sum(self_sum) => match (self_sum, _rhs) {
// // SUM ignores NULL values. Initially an aggregate sum is `None`,
// but // as soon as a non-null value is shown then it becomes
// `Some`. Self::Sum(self_sum) => match (self_sum, _rhs) {
// (None, None) => Self::Sum(None),
// (None, Some(other_scalar)) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => Self::Sum(Some(other_scalar.clone())),
// Scalar::Integer(_) => Self::Sum(Some(other_scalar.clone())),
// Scalar::Unsigned32(_) => Self::Sum(Some(other_scalar.clone())),
// },
// (Some(_self), None) => Self::Sum(Some(_self.clone())),
// (Some(self_scalar), Some(other_scalar)) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::Integer(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::Unsigned32(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::String(_) => panic!("todo - remove String
// scalar"), Scalar::Float(_) =>
// Self::Sum(Some(other_scalar.clone())), Scalar::Integer(_)
// => Self::Sum(Some(other_scalar.clone())),
// Scalar::Unsigned32(_) => Self::Sum(Some(other_scalar.clone())),
// }, (Some(_self), None) => Self::Sum(Some(_self.clone())),
// (Some(self_scalar), Some(other_scalar)) => match other_scalar
// { Scalar::String(_) => panic!("todo - remove String
// scalar"), Scalar::Float(_) => Self::Sum(Some(self_scalar
// + &other_scalar)), Scalar::Integer(_) =>
// Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::Unsigned32(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// },
// },
// }
@ -281,9 +282,9 @@ pub enum AggregateType {
// panic!("can't combine count with other aggregate type");
// }
// }
// // SUM ignores NULL values. Initially an aggregate sum is `None`, but
// // as soon as a non-null value is shown then it becomes `Some`.
// Self::Sum(self_sum) => {
// // SUM ignores NULL values. Initially an aggregate sum is `None`,
// but // as soon as a non-null value is shown then it becomes
// `Some`. Self::Sum(self_sum) => {
// if let Self::Sum(other) = _rhs {
// match (self_sum, other) {
// (None, None) => Self::Sum(None),
@ -317,15 +318,15 @@ pub enum Vector {
Float(Vec<f64>),
Integer(Vec<i64>),
Unsigned32(Vec<u32>),
// TODO(edd): add types like this:
//
// Integer16(Vec<i16>),
// NullInteger16(Vec<Option<i16>>), // contains one or more NULL values
// ...
// ...
//
// We won't need EncodedString then (it can use one of the non-null integer variants)
//
/* TODO(edd): add types like this:
*
* Integer16(Vec<i16>),
* NullInteger16(Vec<Option<i16>>), // contains one or more NULL values
* ...
* ...
*
* We won't need EncodedString then (it can use one of the non-null integer variants)
*/
}
impl Vector {
@ -446,13 +447,16 @@ impl Vector {
count as u64
}
Self::Float(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
(to_row_id - from_row_id) as u64 // fast - no possible NULL
// values
}
Self::Integer(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
(to_row_id - from_row_id) as u64 // fast - no possible NULL
// values
}
Self::Unsigned32(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
(to_row_id - from_row_id) as u64 // fast - no possible NULL
// values
}
}
}
@ -927,8 +931,8 @@ impl Column {
/// Given an encoded value for a row, materialise and return the decoded
/// version.
///
/// This currently just supports decoding integer scalars back into dictionary
/// strings.
/// This currently just supports decoding integer scalars back into
/// dictionary strings.
pub fn decode_value(&self, encoded_id: i64) -> std::string::String {
match self {
Column::String(c) => {
@ -1032,7 +1036,8 @@ impl Column {
}
}
// TODO(edd): consolodate with max_less_than... Should just be single cmp function
// TODO(edd): consolodate with max_less_than... Should just be single cmp
// function
pub fn min_greater_than(&self, value: &Value<'_>) -> bool {
match self {
Column::String(c) => {
@ -1505,12 +1510,12 @@ impl std::fmt::Display for String {
// self.data.sum_by_ids(row_ids)
// }
// pub fn sum_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> f64 {
// self.data.sum_by_id_range(from_row_id, to_row_id)
// pub fn sum_by_id_range(&self, from_row_id: usize, to_row_id: usize) ->
// f64 { self.data.sum_by_id_range(from_row_id, to_row_id)
// }
// pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> usize {
// self.data.count_by_id_range(from_row_id, to_row_id)
// pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) ->
// usize { self.data.count_by_id_range(from_row_id, to_row_id)
// }
// }
@ -1540,8 +1545,9 @@ impl std::fmt::Display for String {
// }
// use arrow::array::Array;
// impl From<arrow::array::PrimitiveArray<arrow::datatypes::Float64Type>> for Float {
// fn from(arr: arrow::array::PrimitiveArray<arrow::datatypes::Float64Type>) -> Self {
// impl From<arrow::array::PrimitiveArray<arrow::datatypes::Float64Type>> for
// Float { fn from(arr:
// arrow::array::PrimitiveArray<arrow::datatypes::Float64Type>) -> Self {
// let len = arr.len();
// let mut min = std::f64::MAX;
// let mut max = std::f64::MIN;
@ -1933,7 +1939,8 @@ pub mod metadata {
size_of::<usize>() + (2 * size_of::<Option<String>>())
//
// TODO: figure out a way to specify that T must be able to describe its runtime size.
// TODO: figure out a way to specify that T must be able to
// describe its runtime size.
//
// match &self.range {
// (None, None) => base_size,
@ -1970,8 +1977,8 @@ pub mod metadata {
// }
// }
// pub fn add_repeated(&mut self, s: Option<String>, additional: usize) {
// self.num_rows += additional;
// pub fn add_repeated(&mut self, s: Option<String>, additional: usize)
// { self.num_rows += additional;
// if s < self.range.0 {
// self.range.0 = s.clone();
@ -1996,8 +2003,8 @@ pub mod metadata {
// pub fn size(&self) -> usize {
// // size of types for num_rows and range
// let base_size = size_of::<usize>() + (2 * size_of::<Option<String>>());
// match &self.range {
// let base_size = size_of::<usize>() + (2 *
// size_of::<Option<String>>()); match &self.range {
// (None, None) => base_size,
// (Some(min), None) => base_size + min.len(),
// (None, Some(max)) => base_size + max.len(),

View File

@ -357,8 +357,8 @@ where
out
}
/// Return the raw encoded values for the provided logical row ids. For Plain
/// encoding this is just the decoded values.
/// Return the raw encoded values for the provided logical row ids. For
/// Plain encoding this is just the decoded values.
fn encoded_values(&self, row_ids: &[usize]) -> Vec<T> {
let mut out = Vec::with_capacity(row_ids.len());
for chunks in row_ids.chunks_exact(4) {
@ -380,7 +380,8 @@ where
/// Return all encoded values. For this encoding this is just the decoded
/// values
fn all_encoded_values(&self) -> Vec<T> {
self.values.clone() // TODO(edd):perf probably can return reference to vec.
self.values.clone() // TODO(edd):perf probably can return reference to
// vec.
}
fn scan_from(&self, _: usize) -> &[Option<Self::Item>] {
@ -683,15 +684,15 @@ impl DictionaryRLE {
// index += *other_rl as usize;
// if other_idx != idx {
// let iter: Box<dyn Iterator<Item = usize>> = Box::new(iter::empty::<usize>());
// return iter;
// let iter: Box<dyn Iterator<Item = usize>> =
// Box::new(iter::empty::<usize>()); return iter;
// }
// Box::new(start..index)
// });
// }
// // I need to return the same type as flatten_map or box the flatten_map return and this one??
// unreachable!("for now");
// // I need to return the same type as flatten_map or box the flatten_map
// return and this one?? unreachable!("for now");
// }
pub fn dictionary(&self) -> BTreeMap<Option<String>, usize> {
@ -822,8 +823,8 @@ impl DictionaryRLE {
results
}
/// Returns true if the encoding contains values other than those provided in
/// `values`.
/// Returns true if the encoding contains values other than those provided
/// in `values`.
pub fn contains_other_values(&self, values: &BTreeSet<&String>) -> bool {
let mut encoded_values = self.entry_index.len();
if self.entry_index.contains_key(&None) {
@ -911,7 +912,6 @@ impl DictionaryRLE {
/// Return the raw encoded values for the provided logical row ids.
///
/// TODO(edd): return type is wrong but I'm making it fit
///
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<u32> {
let mut out = Vec::with_capacity(row_ids.len());

View File

@ -183,8 +183,9 @@ impl Segment {
column_sizes
}
// pub fn scan_column_from(&self, column_name: &str, row_id: usize) -> Option<column::Vector> {
// if let Some(i) = self.column_names().iter().position(|c| c == column_name) {
// pub fn scan_column_from(&self, column_name: &str, row_id: usize) ->
// Option<column::Vector> { if let Some(i) =
// self.column_names().iter().position(|c| c == column_name) {
// return self.columns[i].scan_from(row_id);
// }
// None
@ -341,8 +342,9 @@ impl Segment {
}
}
// now we have all the matching rows for each grouping column and each aggregation
// column. Materialised values for grouping are in encoded form.
// now we have all the matching rows for each grouping column and each
// aggregation column. Materialised values for grouping are in encoded
// form.
//
// Next we iterate all rows in all columns and create a hash entry with
// running aggregates.
@ -395,7 +397,8 @@ impl Segment {
"something broken with grouping! Either processed None or wrong type"
);
}
// The double Some is ok because encoded values are always non-null
// The double Some is ok because encoded values are always
// non-null
} else if let Some(Some(column::Scalar::Unsigned32(v))) = itr.next() {
group_key[i] = v as i64
} else {
@ -719,7 +722,8 @@ impl Segment {
"something broken with grouping! Either processed None or wrong type"
);
}
// the double some should be ok as encoded values can never be None
// the double some should be ok as encoded values can never be
// None
} else if let Some(Some(column::Scalar::Unsigned32(v))) = itr.next() {
v as i64
} else {
@ -892,8 +896,8 @@ impl Segment {
Some(bm)
}
// in this case the complete time range of segment covered so no need to intersect
// on time.
// in this case the complete time range of segment covered so no need to
// intersect on time.
//
// We return an &Option here because we don't want to move the read-only
// meta row_ids bitmap.
@ -1179,9 +1183,9 @@ pub struct SegmentMetaData {
column_names: Vec<String>,
time_range: (i64, i64),
// row_ids is a bitmap containing all row ids.
// row_ids: croaring::Bitmap,
// TODO column sort order
/* row_ids is a bitmap containing all row ids.
* row_ids: croaring::Bitmap,
* TODO column sort order */
}
impl SegmentMetaData {
@ -1594,10 +1598,11 @@ impl<'a> Segments<'a> {
/// Returns the first value for a column in a set of segments.
///
/// The first value is based on the time column, therefore the returned value
/// may not be at the end of the column.
/// The first value is based on the time column, therefore the returned
/// value may not be at the end of the column.
///
/// If the time column has multiple max time values then the result is abitrary.
/// If the time column has multiple max time values then the result is
/// abitrary.
///
/// TODO(edd): could return NULL value..
pub fn first(&self, column_name: &str) -> Option<(i64, column::Value<'_>, usize)> {
@ -1629,7 +1634,8 @@ impl<'a> Segments<'a> {
/// The last value is based on the time column, therefore the returned value
/// may not be at the end of the column.
///
/// If the time column has multiple max time values then the result is undefined.
/// If the time column has multiple max time values then the result is
/// undefined.
///
/// TODO(edd): could return NULL value..
pub fn last(&self, column_name: &str) -> Option<(i64, column::Value<'_>, usize)> {
@ -1656,8 +1662,8 @@ impl<'a> Segments<'a> {
}
}
/// Returns the distinct set of tag keys (column names) matching the provided
/// predicates and time range.
/// Returns the distinct set of tag keys (column names) matching the
/// provided predicates and time range.
pub fn tag_keys(
&self,
time_range: (i64, i64),
@ -1779,7 +1785,8 @@ mod test {
(vec!["env", "role", "foo"], false), // group key contains non-sorted col
(vec!["env", "role", "time"], false), // time may be out of order due to path column
(vec!["env", "role", "path", "time"], true),
(vec!["env", "role", "path", "time", "foo"], false), // group key contains non-sorted col
(vec!["env", "role", "path", "time", "foo"], false), /* group key contains
* non-sorted col */
(vec!["env", "path", "role"], true), // order of columns in group key does not matter
];

View File

@ -13,7 +13,6 @@
//! sorted.
//!
//! Secondly, the sort produced using this partitioning scheme is not stable.
//!
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::ops::Range;

View File

@ -9,8 +9,9 @@
//! # object_store
//!
//! This crate provides APIs for interacting with object storage services. It currently supports
//! PUT, GET, DELETE, and list for Google Cloud Storage, Amazon S3, in-memory and local file storage.
//! This crate provides APIs for interacting with object storage services. It
//! currently supports PUT, GET, DELETE, and list for Google Cloud Storage,
//! Amazon S3, in-memory and local file storage.
//!
//! Future compatibility will include Azure Blob Storage, Minio, and Ceph.
@ -259,13 +260,15 @@ impl fmt::Debug for AmazonS3 {
}
impl AmazonS3 {
/// Configure a connection to Amazon S3 in the specified Amazon region and bucket. Uses
/// [`rusoto_credential::ChainProvider`][cp] to check for credentials in:
/// Configure a connection to Amazon S3 in the specified Amazon region and
/// bucket. Uses [`rusoto_credential::ChainProvider`][cp] to check for
/// credentials in:
///
/// 1. Environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
/// 2. `credential_process` command in the AWS config file, usually located at `~/.aws/config`.
/// 3. AWS credentials file. Usually located at `~/.aws/credentials`.
/// 4. IAM instance profile. Will only work if running on an EC2 instance with an instance
/// 1. Environment variables: `AWS_ACCESS_KEY_ID` and
/// `AWS_SECRET_ACCESS_KEY` 2. `credential_process` command in the AWS
/// config file, usually located at `~/.aws/config`. 3. AWS credentials
/// file. Usually located at `~/.aws/credentials`. 4. IAM instance
/// profile. Will only work if running on an EC2 instance with an instance
/// profile/role.
///
/// [cp]: https://docs.rs/rusoto_credential/0.43.0/rusoto_credential/struct.ChainProvider.html
@ -400,8 +403,9 @@ impl AmazonS3 {
let names = contents.into_iter().flat_map(|object| object.key).collect();
// The AWS response contains a field named `is_truncated` as well as
// `next_continuation_token`, and we're assuming that `next_continuation_token` is only
// set when `is_truncated` is true (and therefore not checking `is_truncated`).
// `next_continuation_token`, and we're assuming that `next_continuation_token`
// is only set when `is_truncated` is true (and therefore not
// checking `is_truncated`).
let next_state = if let Some(next_continuation_token) = resp.next_continuation_token {
ListState::HasMore(next_continuation_token)
} else {
@ -413,7 +417,8 @@ impl AmazonS3 {
}
}
/// In-memory storage suitable for testing or for opting out of using a cloud storage provider.
/// In-memory storage suitable for testing or for opting out of using a cloud
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
storage: RwLock<BTreeMap<String, Bytes>>,
@ -506,7 +511,8 @@ impl InMemory {
}
}
/// Local filesystem storage suitable for testing or for opting out of using a cloud storage provider.
/// Local filesystem storage suitable for testing or for opting out of using a
/// cloud storage provider.
#[derive(Debug)]
pub struct File {
root: PathBuf,

View File

@ -13,7 +13,6 @@
//! sorted.
//!
//! Secondly, the sort produced using this partitioning scheme is not stable.
//!
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::ops::Range;

View File

@ -115,8 +115,8 @@ impl<E> From<Result<StringSetRef, E>> for StringSetPlan
where
E: std::error::Error + Send + Sync + 'static,
{
/// Create a StringSetPlan from a Result<StringSetRef> result, wrapping the error type
/// appropriately
/// Create a StringSetPlan from a Result<StringSetRef> result, wrapping the
/// error type appropriately
fn from(result: Result<StringSetRef, E>) -> Self {
match result {
Ok(set) => Self::Known(Ok(set)),

View File

@ -376,7 +376,8 @@ mod tests {
last_timestamp: 5000,
};
// use something that has a later timestamp and expect the later one takes precidence
// use something that has a later timestamp and expect the later one takes
// precidence
let l2 = FieldList {
fields: vec![field1_later.clone()],
};
@ -399,7 +400,8 @@ mod tests {
last_timestamp: 5000,
};
// use something that has a later timestamp and expect the later one takes precedence
// use something that has a later timestamp and expect the later one takes
// precedence
let l3 = FieldList {
fields: vec![field1_new_type],
};

View File

@ -1,4 +1,5 @@
//! This module contains plumbing to connect InfluxDB IOx extensions to DataFusion
//! This module contains plumbing to connect InfluxDB IOx extensions to
//! DataFusion
use std::sync::Arc;
@ -128,8 +129,8 @@ impl IOxExecutionContext {
self.inner.collect(physical_plan).await
}
/// Executes the physical plan and produces a RecordBatchStream to stream over the result
/// that iterates over the results.
/// Executes the physical plan and produces a RecordBatchStream to stream
/// over the result that iterates over the results.
pub async fn execute(
&self,
physical_plan: Arc<dyn ExecutionPlan>,
@ -164,10 +165,10 @@ fn window_bounds(
// further optimized, which we leave as an exercise to our future
// selves
// `args` and output are dynamically-typed Arrow arrays, which means that we need to:
// 1. cast the values to the type we want
// 2. perform the window_bounds calculation for every element in the timestamp array
// 3. construct the resulting array
// `args` and output are dynamically-typed Arrow arrays, which means that we
// need to: 1. cast the values to the type we want
// 2. perform the window_bounds calculation for every element in the timestamp
// array 3. construct the resulting array
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 1);
@ -177,8 +178,8 @@ fn window_bounds(
.downcast_ref::<Int64Array>()
.expect("cast of time failed");
// Note: the Go code uses the `Stop` field of the `GetEarliestBounds` call as the window boundary
// https://github.com/influxdata/influxdb/blob/master/storage/reads/array_cursor.gen.go#L546
// Note: the Go code uses the `Stop` field of the `GetEarliestBounds` call as
// the window boundary https://github.com/influxdata/influxdb/blob/master/storage/reads/array_cursor.gen.go#L546
// Note window doesn't use the period argument
let period = window::Duration::from_nsecs(0);

View File

@ -496,7 +496,8 @@ mod tests {
to_stringset(self.expected_output)
}
/// run the input batches through a schema pivot and return the results as a StringSetRef
/// run the input batches through a schema pivot and return the results
/// as a StringSetRef
async fn pivot(&self) -> StringSetRef {
let schema = Self::input_schema();

View File

@ -1,12 +1,13 @@
//! This module contains the definition of a "SeriesSet" a plan that when run produces
//! rows that can be logically divided into "Series"
//! This module contains the definition of a "SeriesSet" a plan that when run
//! produces rows that can be logically divided into "Series"
//!
//! Specifically, this thing can produce represents a set of "tables",
//! and each table is sorted on a set of "tag" columns, meaning the
//! data for groups / series will be contiguous.
//!
//! For example, the output columns of such a plan would be:
//! (tag col0) (tag col1) ... (tag colN) (field val1) (field val2) ... (field valN) .. (timestamps)
//! (tag col0) (tag col1) ... (tag colN) (field val1) (field val2) ... (field
//! valN) .. (timestamps)
//!
//! Note that the data will come out ordered by the tag keys (ORDER BY
//! (tag col0) (tag col1) ... (tag colN))
@ -849,8 +850,8 @@ mod tests {
results
}
/// Test helper: parses the csv content into a single record batch arrow arrays
/// columnar ArrayRef according to the schema
/// Test helper: parses the csv content into a single record batch arrow
/// arrays columnar ArrayRef according to the schema
fn parse_to_record_batch(schema: SchemaRef, data: &str) -> RecordBatch {
let has_header = false;
let delimiter = Some(b',');

View File

@ -22,8 +22,8 @@ pub enum Error {
InvalidId { source: ParseIntError },
}
/// ID_LENGTH is the exact length a string (or a byte slice representing it) must have in order to
/// be decoded into a valid ID.
/// ID_LENGTH is the exact length a string (or a byte slice representing it)
/// must have in order to be decoded into a valid ID.
const ID_LENGTH: usize = 16;
/// Id is a unique identifier.

View File

@ -52,7 +52,8 @@ pub trait TSDatabase: Debug + Send + Sync {
/// writes parsed lines into this database
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error>;
/// Stores the replicated write in the write buffer and, if enabled, the write ahead log.
/// Stores the replicated write in the write buffer and, if enabled, the
/// write ahead log.
async fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error>;
/// Returns a plan that lists the names of tables in this
@ -111,7 +112,8 @@ pub trait SQLDatabase: Debug + Send + Sync {
type Partition: Send + Sync + 'static + PartitionChunk;
type Error: std::error::Error + Send + Sync + 'static;
/// Execute the specified query and return arrow record batches with the result
/// Execute the specified query and return arrow record batches with the
/// result
async fn query(&self, query: &str) -> Result<Vec<RecordBatch>, Self::Error>;
/// Fetch the specified table names and columns as Arrow

View File

@ -131,7 +131,8 @@ impl PredicateBuilder {
/// Sets table name restrictions
pub fn tables(mut self, tables: Vec<String>) -> Self {
// We need to distinguish predicates like `table_name In
// (foo, bar)` and `table_name = foo and table_name = bar` in order to handle this
// (foo, bar)` and `table_name = foo and table_name = bar` in order to handle
// this
assert!(
self.inner.table_names.is_none(),
"Multiple table predicate specification not yet supported"
@ -145,7 +146,8 @@ impl PredicateBuilder {
/// Sets field_column restriction
pub fn field_columns(mut self, columns: Vec<String>) -> Self {
// We need to distinguish predicates like `column_name In
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle this
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle
// this
if self.inner.field_columns.is_some() {
unimplemented!("Complex/Multi field predicates are not yet supported");
}

View File

@ -1,5 +1,5 @@
//! This module provides a reference implementaton of `query::DatabaseSource` and
//! `query::Database` for use in testing.
//! This module provides a reference implementaton of `query::DatabaseSource`
//! and `query::Database` for use in testing.
use arrow_deps::arrow::record_batch::RecordBatch;
@ -139,7 +139,8 @@ impl TestDatabase {
.expect("writing lines");
}
/// Set the list of column names that will be returned on a call to column_names
/// Set the list of column names that will be returned on a call to
/// column_names
pub async fn set_column_names(&self, column_names: Vec<String>) {
let column_names = column_names.into_iter().collect::<StringSet>();
let column_names = Arc::new(column_names);
@ -152,7 +153,8 @@ impl TestDatabase {
self.column_names_request.clone().lock().await.take()
}
/// Set the list of column values that will be returned on a call to column_values
/// Set the list of column values that will be returned on a call to
/// column_values
pub async fn set_column_values(&self, column_values: Vec<String>) {
let column_values = column_values.into_iter().collect::<StringSet>();
let column_values = Arc::new(column_values);
@ -403,7 +405,8 @@ impl SQLDatabase for TestDatabase {
type Partition = TestPartition;
type Error = TestError;
/// Execute the specified query and return arrow record batches with the result
/// Execute the specified query and return arrow record batches with the
/// result
async fn query(&self, _query: &str) -> Result<Vec<RecordBatch>, Self::Error> {
unimplemented!("query Not yet implemented");
}

View File

@ -55,7 +55,8 @@ impl Duration {
}
}
/// create a duration from a non negative value of months and a negative flag
/// create a duration from a non negative value of months and a negative
/// flag
pub fn from_months_with_negative(months: i64, negative: bool) -> Self {
assert_eq!(months < 0, negative);
Self {
@ -590,11 +591,13 @@ mod tests {
#[test]
#[should_panic]
fn test_timestamp_to_datetime_negative() {
// Note while testing to make sure a negative timestamp doesn't overflow, it turns out
// that the chrono library itself didn't handle parsing negative timestamps:
// Note while testing to make sure a negative timestamp doesn't overflow, it
// turns out that the chrono library itself didn't handle parsing
// negative timestamps:
//
// thread 'window::tests::test_timestamp_to_datetime' panicked at 'invalid or out-of-range datetime',
//src/github.com-1ecc6299db9ec823/chrono-0.4.19/src/naive/datetime.rs:117:18
// thread 'window::tests::test_timestamp_to_datetime' panicked at 'invalid or
// out-of-range datetime', src/github.com-1ecc6299db9ec823/chrono-0.4.
// 19/src/naive/datetime.rs:117:18
assert_eq!(timestamp_to_datetime(-1568756160).to_rfc3339(), "foo");
}
}

View File

@ -92,9 +92,9 @@ fn benchmark_read_group_pre_computed_groups_no_predicates_vary_cardinality(
// This benchmark tracks the performance of read_group when it is able to use
// the per-group bitsets provided by RLE columns. This benchmark seeks to
// understand the performance of working on these bitsets directly in the segment
// when the size of these bitsets changes. Therefore the cardinality (groups)
// is fixed, but the number of rows in the column is varied.
// understand the performance of working on these bitsets directly in the
// segment when the size of these bitsets changes. Therefore the cardinality
// (groups) is fixed, but the number of rows in the column is varied.
fn benchmark_read_group_pre_computed_groups_no_predicates_vary_rows(
c: &mut Criterion,
benchmark_group_name: &str,
@ -320,7 +320,8 @@ fn generate_trace_for_segment(
let normal = Normal::new(10.0, 5.0).unwrap();
let node_id_prefix = format!("{}-{}-{}", env_value, data_centre_value, cluster_value,);
for _ in 0..spans_per_trace {
// these values are not the same for each span so need to be generated separately.
// these values are not the same for each span so need to be generated
// separately.
let node_id = rng.gen_range(0, 10); // cardinality is 2 * 10 * 10 * 10 = 2,000
let node_id = format!("node_id-{}-{}", node_id_prefix, node_id);

View File

@ -262,7 +262,6 @@ impl Column {
//
/// Determine the set of row ids that satisfy the predicate.
///
pub fn row_ids_filter(
&self,
op: &cmp::Operator,
@ -406,14 +405,13 @@ impl Column {
}
}
// breaking this down:
// * Extract a Scalar variant from `value`, which should panic if
// that's not possible;
// * Try to safely convert that scalar to a primitive value based
// on the logical type used for the metadata on the column.
// * If the value can't be safely converted then there is no way
// that said value could be stored in the column at all -> false.
// * Otherwise if the value falls inside the range of values in
// the column range then it may be in the column -> true.
// * Extract a Scalar variant from `value`, which should panic if that's not possible;
// * Try to safely convert that scalar to a primitive value based on the logical type
// used for the metadata on the column.
// * If the value can't be safely converted then there is no way that said value could
// be stored in the column at all -> false.
// * Otherwise if the value falls inside the range of values in the column range then
// it may be in the column -> true.
Column::Float(meta, _) => value
.scalar()
.try_as_f64()
@ -445,17 +443,14 @@ impl Column {
}
}
// breaking this down:
// * If the column contains null values then it's not possible for
// all values in the column to match the predicate.
// * Extract a Scalar variant from `value`, which should panic if
// that's not possible;
// * Try to safely convert that scalar to a primitive value based
// on the logical type used for the metadata on the column.
// * If the column contains null values then it's not possible for all values in the
// column to match the predicate.
// * Extract a Scalar variant from `value`, which should panic if that's not possible;
// * Try to safely convert that scalar to a primitive value based on the logical type
// used for the metadata on the column.
// * If the value can't be safely converted then -> false.
// * Otherwise if the value falls inside the range of values in
// the column range then check if all values satisfy the
// predicate.
//
// * Otherwise if the value falls inside the range of values in the column range then
// check if all values satisfy the predicate.
Column::Float(meta, data) => {
if data.contains_null() {
return false;
@ -503,12 +498,10 @@ impl Column {
}
}
// breaking this down:
// * Extract a Scalar variant from `value`, which should panic if
// that's not possible;
// * Convert that scalar to a primitive value based
// on the logical type used for the metadata on the column.
// * Extract a Scalar variant from `value`, which should panic if that's not possible;
// * Convert that scalar to a primitive value based on the logical type used for the
// metadata on the column.
// * See if one can prove none of the column can match the predicate.
//
Column::Float(meta, data) => meta.match_no_values(op, value.scalar().as_f64()),
Column::Integer(meta, data) => meta.match_no_values(op, value.scalar().as_i64()),
Column::Unsigned(meta, data) => meta.match_no_values(op, value.scalar().as_u64()),
@ -583,7 +576,8 @@ impl Column {
// Methods for inspecting
//
/// Determines if the column has a non-null value at any of the provided rows.
/// Determines if the column has a non-null value at any of the provided
/// rows.
pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool {
todo!()
}
@ -1101,8 +1095,8 @@ impl IntegerEncoding {
/// Returns the logical values found at the provided row ids.
///
/// TODO(edd): perf - provide a pooling mechanism for these destination vectors
/// so that they can be re-used.
/// TODO(edd): perf - provide a pooling mechanism for these destination
/// vectors so that they can be re-used.
pub fn values(&self, row_ids: &[u32]) -> Values<'_> {
match &self {
// signed 64-bit variants - logical type is i64 for all these
@ -1126,8 +1120,8 @@ impl IntegerEncoding {
/// Returns all logical values in the column.
///
/// TODO(edd): perf - provide a pooling mechanism for these destination vectors
/// so that they can be re-used.
/// TODO(edd): perf - provide a pooling mechanism for these destination
/// vectors so that they can be re-used.
pub fn all_values(&self) -> Values<'_> {
match &self {
// signed 64-bit variants - logical type is i64 for all these
@ -1988,10 +1982,10 @@ pub enum AggregateType {
Min,
Max,
Sum,
// TODO - support:
// Distinct - (edd): not sure this counts as an aggregations. Seems more like a special filter.
// CountDistinct
// Percentile
/* TODO - support:
* Distinct - (edd): not sure this counts as an aggregations. Seems more like a special
* filter. CountDistinct
* Percentile */
}
impl std::fmt::Display for AggregateType {

View File

@ -205,7 +205,6 @@ impl Encoding {
/// the column.
///
/// NULL values are represented by None.
///
fn all_values<'a>(&'a mut self, dst: Vec<Option<&'a str>>) -> Vec<Option<&'a str>> {
match self {
Encoding::RLE(enc) => enc.all_values(dst),
@ -238,7 +237,6 @@ impl Encoding {
/// Return the raw encoded values for the provided logical row ids.
/// Encoded values for NULL values are included.
///
fn encoded_values(&self, row_ids: &[u32], dst: Vec<u32>) -> Vec<u32> {
match self {
Encoding::RLE(enc) => enc.encoded_values(row_ids, dst),
@ -276,7 +274,6 @@ impl Encoding {
/// values. By exposing the current result set to each column (as an
/// argument to `contains_other_values`) columns can be short-circuited when
/// they only contain values that have already been discovered.
///
fn contains_other_values(&self, values: &BTreeSet<Option<&String>>) -> bool {
match self {
Encoding::RLE(enc) => enc.contains_other_values(values),
@ -541,7 +538,8 @@ mod test {
name
);
// The encoding also supports comparisons on values that don't directly exist in the column.
// The encoding also supports comparisons on values that don't directly exist in
// the column.
let ids = enc.row_ids_filter(&"abba", &cmp::Operator::GT, RowIDs::Vector(vec![]));
assert_eq!(
ids,

View File

@ -576,7 +576,6 @@ impl Plain {
/// the column.
///
/// NULL values are represented by None.
///
pub fn all_values<'a>(&'a self, mut dst: Vec<Option<&'a str>>) -> Vec<Option<&'a str>> {
dst.clear();
dst.reserve(self.entries.len());
@ -620,7 +619,6 @@ impl Plain {
/// Return the raw encoded values for the provided logical row ids.
/// Encoded values for NULL values are included.
///
pub fn encoded_values(&self, row_ids: &[u32], mut dst: Vec<u32>) -> Vec<u32> {
dst.clear();
dst.reserve(row_ids.len());
@ -662,7 +660,6 @@ impl Plain {
/// values. By exposing the current result set to each column (as an
/// argument to `contains_other_values`) columns can be short-circuited when
/// they only contain values that have already been discovered.
///
pub fn contains_other_values(&self, values: &BTreeSet<Option<&String>>) -> bool {
todo!()
}

View File

@ -616,7 +616,6 @@ impl RLE {
/// the column.
///
/// NULL values are represented by None.
///
pub fn all_values<'a>(&'a self, mut dst: Vec<Option<&'a str>>) -> Vec<Option<&'a str>> {
dst.clear();
dst.reserve(self.num_rows as usize);
@ -711,7 +710,6 @@ impl RLE {
/// Return the raw encoded values for the provided logical row ids.
/// Encoded values for NULL values are included.
///
pub fn encoded_values(&self, row_ids: &[u32], mut dst: Vec<u32>) -> Vec<u32> {
dst.clear();
dst.reserve(row_ids.len());
@ -775,7 +773,6 @@ impl RLE {
/// values. By exposing the current result set to each column (as an
/// argument to `contains_other_values`) columns can be short-circuited when
/// they only contain values that have already been discovered.
///
pub fn contains_other_values(&self, values: &BTreeSet<Option<&String>>) -> bool {
let mut encoded_values = self.index_entries.len();
if !self.contains_null {

View File

@ -3,10 +3,10 @@
//! This encoding stores a column of fixed-width numerical values potentially
//! using a smaller physical type in memory than the provided logical type.
//!
//! For example, if you have a column with 64-bit integers: [122, 232, 33, 0, -12]
//! then you can reduce the space needed to store them, by converting them as a
//! `Vec<i8>` instead of a `Vec<i64>`. In this case, this reduces the size of
//! the column data by 87.5% and generally should increase throughput of
//! For example, if you have a column with 64-bit integers: [122, 232, 33, 0,
//! -12] then you can reduce the space needed to store them, by converting them
//! as a `Vec<i8>` instead of a `Vec<i64>`. In this case, this reduces the size
//! of the column data by 87.5% and generally should increase throughput of
//! operations on the column data.
//!
//! The encodings within this module do not concern themselves with choosing the
@ -27,17 +27,16 @@ use crate::column::{cmp, RowIDs};
///
/// For a given logical datatype `U`, `Fixed` encodings can store values with
/// a different datatype `T`, where `size_of::<T>() <= size_of::<U>()`.
///
pub struct Fixed<T>
where
T: PartialOrd + std::fmt::Debug,
{
// backing data
values: Vec<T>,
// TODO(edd): future optimisation to stop filtering early.
// total_order can be used as a hint to stop scanning the column early when
// applying a comparison predicate to the column.
// total_order: bool,
/* TODO(edd): future optimisation to stop filtering early.
* total_order can be used as a hint to stop scanning the column early when
* applying a comparison predicate to the column.
* total_order: bool, */
}
impl<T> std::fmt::Display for Fixed<T>
@ -133,10 +132,10 @@ where
/// Returns the logical (decoded) values for all the rows in the column.
///
/// `all_values` materialises the returned values according to the logical type
/// of the column, which is specified by the type `U`. The container for
/// returned values must be provided by the caller, though `values` will
/// ensure it has sufficient capacity.
/// `all_values` materialises the returned values according to the logical
/// type of the column, which is specified by the type `U`. The
/// container for returned values must be provided by the caller, though
/// `values` will ensure it has sufficient capacity.
pub fn all_values<U>(&self, mut dst: Vec<U>) -> Vec<U>
where
U: From<T>,
@ -648,8 +647,9 @@ mod test {
assert_eq!(v.max::<i64>(&[0, 1, 2, 3, 4]), 110);
// Test behaviour with non-real numbers - NaN should be the maximum.
// let v = Fixed::<f64>::from(vec![11.2, 3.32, std::f64::NAN, 34.9].as_slice());
// assert!(v.max::<f64>(&[0, 1, 2, 3]).is_nan());
// let v = Fixed::<f64>::from(vec![11.2, 3.32, std::f64::NAN,
// 34.9].as_slice()); assert!(v.max::<f64>(&[0, 1, 2,
// 3]).is_nan());
}
#[test]

View File

@ -3,10 +3,10 @@
//! This encoding stores a column of fixed-width numerical values potentially
//! using a smaller physical type in memory than the provided logical type.
//!
//! For example, if you have a column with 64-bit integers: [122, 232, 33, 0, -12]
//! then you can reduce the space needed to store them, by converting them as a
//! `Vec<i8>` instead of a `Vec<i64>`. In this case, this reduces the size of
//! the column data by 87.5% and generally should increase throughput of
//! For example, if you have a column with 64-bit integers: [122, 232, 33, 0,
//! -12] then you can reduce the space needed to store them, by converting them
//! as a `Vec<i8>` instead of a `Vec<i64>`. In this case, this reduces the size
//! of the column data by 87.5% and generally should increase throughput of
//! operations on the column data.
//!
//! The encodings within this module do not concern themselves with choosing the
@ -179,10 +179,10 @@ where
/// provided row IDs.
///
/// TODO(edd): I have experimented with using the Arrow kernels for these
/// aggregations methods but they're currently significantly slower than this
/// implementation (about 85% in the `sum` case). We will revisit them in
/// the future as they do would the implementation of these aggregation
/// functions.
/// aggregations methods but they're currently significantly slower than
/// this implementation (about 85% in the `sum` case). We will revisit
/// them in the future as they do would the implementation of these
/// aggregation functions.
pub fn sum(&self, row_ids: &[u32]) -> Option<T::Native>
where
T::Native: std::ops::Add<Output = T::Native>,
@ -487,9 +487,9 @@ where
// }
// }
//
// impl From<&[Option<i64>]> for FixedNull<arrow_deps::arrow::datatypes::Int64Type> {
// fn from(v: &[i64]) -> Self {
// Self{
// impl From<&[Option<i64>]> for
// FixedNull<arrow_deps::arrow::datatypes::Int64Type> { fn from(v: &[i64])
// -> Self { Self{
// arr: PrimitiveArray::from(v.to_vec()),
// }
// }

View File

@ -15,9 +15,8 @@ use column::AggregateType;
use partition::Partition;
use segment::ColumnName;
/// The Segment Store is responsible for providing read access to partition data.
///
///
/// The Segment Store is responsible for providing read access to partition
/// data.
#[derive(Default)]
pub struct Store {
// A mapping from database name (tenant id, bucket id etc) to a database.
@ -173,8 +172,8 @@ impl Store {
None
}
/// Returns the distinct set of tag keys (column names) matching the provided
/// optional predicates and time range.
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
&self,
database_name: &str,
@ -379,8 +378,8 @@ impl Database {
todo!()
}
/// Returns the distinct set of tag keys (column names) matching the provided
/// optional predicates and time range.
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
&self,
table_name: &str,

View File

@ -104,8 +104,8 @@ impl Partition {
todo!()
}
/// Returns the distinct set of tag keys (column names) matching the provided
/// optional predicates and time range.
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
&self,
table_name: String,

View File

@ -139,9 +139,10 @@ impl Segment {
&self.columns[*self.all_columns_by_name.get(name).unwrap()]
}
// Takes a `ColumnName`, looks up that column in the `Segment`, and returns a reference to
// that column's name owned by the `Segment` along with a reference to the column itself.
// The returned column name will have the lifetime of `self`, not the lifetime of the input.
// Takes a `ColumnName`, looks up that column in the `Segment`, and returns a
// reference to that column's name owned by the `Segment` along with a
// reference to the column itself. The returned column name will have the
// lifetime of `self`, not the lifetime of the input.
fn column_name_and_column(&self, name: ColumnName<'_>) -> (&str, &Column) {
let (column_name, column_index) = self.all_columns_by_name.get_key_value(name).unwrap();
(column_name, &self.columns[*column_index])
@ -170,7 +171,6 @@ impl Segment {
///
/// Methods for reading the segment.
///
/// Returns a set of materialised column values that satisfy a set of
/// predicates.
@ -553,8 +553,8 @@ impl ColumnType {
// // HashGroup specifies that groupings should be done using a hashmap.
// HashGroup,
// // SortGroup specifies that groupings should be determined by first sorting
// // the data to be grouped by the group-key.
// // SortGroup specifies that groupings should be determined by first
// sorting // the data to be grouped by the group-key.
// SortGroup,
// }
@ -1067,32 +1067,69 @@ west,POST,304,101,203
let cases = vec![
("az", &(Operator::Equal, Value::String("west")), false), // no az column
("region", &(Operator::Equal, Value::String("west")), true), // region column does contain "west"
("region", &(Operator::Equal, Value::String("over")), true), // region column might contain "over"
("region", &(Operator::Equal, Value::String("abc")), false), // region column can't contain "abc"
("region", &(Operator::Equal, Value::String("zoo")), false), // region column can't contain "zoo"
("region", &(Operator::Equal, Value::String("west")), true), /* region column does
* contain "west" */
("region", &(Operator::Equal, Value::String("over")), true), /* region column might
* contain "over" */
("region", &(Operator::Equal, Value::String("abc")), false), /* region column can't
* contain "abc" */
("region", &(Operator::Equal, Value::String("zoo")), false), /* region column can't
* contain "zoo" */
(
"region",
&(Operator::NotEqual, Value::String("hello")),
true,
), // region column might not contain "hello"
("method", &(Operator::NotEqual, Value::String("GET")), false), // method must only contain "GET"
("region", &(Operator::GT, Value::String("abc")), true), // region column might contain something > "abc"
("region", &(Operator::GT, Value::String("north")), true), // region column might contain something > "north"
("region", &(Operator::GT, Value::String("west")), false), // region column can't contain something > "west"
("region", &(Operator::GTE, Value::String("abc")), true), // region column might contain something ≥ "abc"
("region", &(Operator::GTE, Value::String("east")), true), // region column might contain something ≥ "east"
("region", &(Operator::GTE, Value::String("west")), true), // region column might contain something ≥ "west"
("region", &(Operator::GTE, Value::String("zoo")), false), // region column can't contain something ≥ "zoo"
("region", &(Operator::LT, Value::String("foo")), true), // region column might contain something < "foo"
("region", &(Operator::LT, Value::String("north")), true), // region column might contain something < "north"
("region", &(Operator::LT, Value::String("south")), true), // region column might contain something < "south"
("region", &(Operator::LT, Value::String("east")), false), // region column can't contain something < "east"
("region", &(Operator::LT, Value::String("abc")), false), // region column can't contain something < "abc"
("region", &(Operator::LTE, Value::String("east")), true), // region column might contain something ≤ "east"
("region", &(Operator::LTE, Value::String("north")), true), // region column might contain something ≤ "north"
("region", &(Operator::LTE, Value::String("south")), true), // region column might contain something ≤ "south"
("region", &(Operator::LTE, Value::String("abc")), false), // region column can't contain something ≤ "abc"
("method", &(Operator::NotEqual, Value::String("GET")), false), /* method must only
* contain "GET" */
("region", &(Operator::GT, Value::String("abc")), true), /* region column might
* contain something >
* "abc" */
("region", &(Operator::GT, Value::String("north")), true), /* region column might
* contain something >
* "north" */
("region", &(Operator::GT, Value::String("west")), false), /* region column can't
* contain something >
* "west" */
("region", &(Operator::GTE, Value::String("abc")), true), /* region column might
* contain something
* "abc" */
("region", &(Operator::GTE, Value::String("east")), true), /* region column might
* contain something
* "east" */
("region", &(Operator::GTE, Value::String("west")), true), /* region column might
* contain something
* "west" */
("region", &(Operator::GTE, Value::String("zoo")), false), /* region column can't
* contain something
* "zoo" */
("region", &(Operator::LT, Value::String("foo")), true), /* region column might
* contain something <
* "foo" */
("region", &(Operator::LT, Value::String("north")), true), /* region column might
* contain something <
* "north" */
("region", &(Operator::LT, Value::String("south")), true), /* region column might
* contain something <
* "south" */
("region", &(Operator::LT, Value::String("east")), false), /* region column can't
* contain something <
* "east" */
("region", &(Operator::LT, Value::String("abc")), false), /* region column can't
* contain something <
* "abc" */
("region", &(Operator::LTE, Value::String("east")), true), /* region column might
* contain something
* "east" */
("region", &(Operator::LTE, Value::String("north")), true), /* region column might
* contain something
* "north" */
("region", &(Operator::LTE, Value::String("south")), true), /* region column might
* contain something
* "south" */
("region", &(Operator::LTE, Value::String("abc")), false), /* region column can't
* contain something
* "abc" */
];
for (column_name, predicate, exp) in cases {

View File

@ -128,10 +128,10 @@ impl Table {
/// Returns vectors of columnar data for the specified column
/// selections.
///
/// Results may be filtered by (currently only) conjunctive (AND) predicates,
/// but can be ranged by time, which should be represented as nanoseconds
/// since the epoch. Results are included if they satisfy the predicate and
/// fall with the [min, max) time range domain.
/// Results may be filtered by (currently only) conjunctive (AND)
/// predicates, but can be ranged by time, which should be represented
/// as nanoseconds since the epoch. Results are included if they satisfy
/// the predicate and fall with the [min, max) time range domain.
pub fn select<'input>(
&self,
columns: &[ColumnName<'input>],
@ -180,15 +180,21 @@ impl Table {
aggregates: &'input [(ColumnName<'input>, AggregateType)],
) -> ReadGroupResults<'input, '_> {
if !self.has_all_columns(&group_columns) {
return ReadGroupResults::default(); //TODO(edd): return an error here "group key column x not found"
return ReadGroupResults::default(); //TODO(edd): return an error
// here "group key column x not
// found"
}
if !self.has_all_columns(&aggregates.iter().map(|(name, _)| *name).collect::<Vec<_>>()) {
return ReadGroupResults::default(); //TODO(edd): return an error here "aggregate column x not found"
return ReadGroupResults::default(); //TODO(edd): return an error
// here "aggregate column x not
// found"
}
if !self.has_all_columns(&predicates.iter().map(|(name, _)| *name).collect::<Vec<_>>()) {
return ReadGroupResults::default(); //TODO(edd): return an error here "predicate column x not found"
return ReadGroupResults::default(); //TODO(edd): return an error
// here "predicate column x not
// found"
}
// identify segments where time range and predicates match could match
@ -392,8 +398,8 @@ impl Table {
// ---- Schema API queries
//
/// Returns the distinct set of tag keys (column names) matching the provided
/// optional predicates and time range.
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys<'a>(
&self,
time_range: (i64, i64),
@ -403,9 +409,9 @@ impl Table {
// Firstly, this should short-circuit early if all of the table's columns
// are present in `found_columns`.
//
// Otherwise, identify segments where time range and predicates match could match
// using segment meta data and then execute against those segments and
// merge results.
// Otherwise, identify segments where time range and predicates match could
// match using segment meta data and then execute against those segments
// and merge results.
todo!();
}

View File

@ -41,8 +41,8 @@ pub enum Error {
#[allow(dead_code)]
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// An in-memory buffer of a write ahead log. It is split up into segments, which can be persisted
/// to object storage.
/// An in-memory buffer of a write ahead log. It is split up into segments,
/// which can be persisted to object storage.
#[derive(Debug)]
pub struct Buffer {
max_size: u64,
@ -66,9 +66,10 @@ impl Buffer {
}
}
/// Appends a replicated write onto the buffer, returning the segment if it has been closed out.
/// If the max size of the buffer would be exceeded by accepting the write, the oldest (first)
/// of the closed segments will be dropped, if it is persisted. Otherwise, an error is returned.
/// Appends a replicated write onto the buffer, returning the segment if it
/// has been closed out. If the max size of the buffer would be exceeded
/// by accepting the write, the oldest (first) of the closed segments
/// will be dropped, if it is persisted. Otherwise, an error is returned.
#[allow(dead_code)]
pub async fn append(&mut self, write: ReplicatedWrite) -> Result<Option<Arc<Segment>>> {
let write_size = u64::try_from(write.data.len())
@ -132,15 +133,17 @@ impl Buffer {
self.current_size
}
/// Returns any replicated writes from the given writer ID and sequence number onward. This
/// will include writes from other writers. The given writer ID and sequence are to identify
/// from what point to replay writes. If no write matches the given writer ID and sequence
/// Returns any replicated writes from the given writer ID and sequence
/// number onward. This will include writes from other writers. The
/// given writer ID and sequence are to identify from what point to
/// replay writes. If no write matches the given writer ID and sequence
/// number, all replicated writes within the buffer will be returned.
#[allow(dead_code)]
pub fn all_writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> {
let mut writes = Vec::new();
// start with the newest writes and go back. Hopefully they're asking for something recent.
// start with the newest writes and go back. Hopefully they're asking for
// something recent.
for w in self.open_segment.writes.iter().rev() {
if w.equal_to_writer_and_sequence(since.id, since.sequence) {
writes.reverse();
@ -163,14 +166,16 @@ impl Buffer {
writes
}
/// Returns replicated writes from the given writer ID and sequence number onward. This returns
/// only writes from the passed in writer ID. If no write matches the given writer ID and sequence
/// number, all replicated writes within the buffer for that writer will be returned.
/// Returns replicated writes from the given writer ID and sequence number
/// onward. This returns only writes from the passed in writer ID. If no
/// write matches the given writer ID and sequence number, all
/// replicated writes within the buffer for that writer will be returned.
#[allow(dead_code)]
pub fn writes_since(&self, since: WriterSequence) -> Vec<Arc<ReplicatedWrite>> {
let mut writes = Vec::new();
// start with the newest writes and go back. Hopefully they're asking for something recent.
// start with the newest writes and go back. Hopefully they're asking for
// something recent.
for w in self.open_segment.writes.iter().rev() {
let (writer, sequence) = w.writer_and_sequence();
if writer == since.id {
@ -230,7 +235,8 @@ impl Segment {
}
}
// appends the write to the segment, keeping track of the summary information about the writer
// appends the write to the segment, keeping track of the summary information
// about the writer
#[allow(dead_code)]
fn append(&mut self, write: ReplicatedWrite) -> Result<()> {
let (writer_id, sequence_number) = write.writer_and_sequence();
@ -244,8 +250,9 @@ impl Segment {
Ok(())
}
// checks that the sequence numbers in this segment are monotonically increasing. Also keeps
// track of the starting and ending sequence numbers and if any were missing.
// checks that the sequence numbers in this segment are monotonically
// increasing. Also keeps track of the starting and ending sequence numbers
// and if any were missing.
fn validate_and_update_sequence_summary(
&mut self,
writer_id: WriterId,

View File

@ -1,27 +1,31 @@
//! This crate contains code that defines the logic for a running InfluxDB IOx server. It
//! also has the logic for how servers talk to each other, which includes replication,
//! subscriptions, querying, and traits that abstract these methods away for testing purposes.
//! This crate contains code that defines the logic for a running InfluxDB IOx
//! server. It also has the logic for how servers talk to each other, which
//! includes replication, subscriptions, querying, and traits that abstract
//! these methods away for testing purposes.
//!
//! This diagram shows the lifecycle of a write coming into a set of IOx servers
//! configured in different roles. This doesn't include ensuring that the replicated
//! writes are durable, or snapshotting partitions in the write buffer. Have a read
//! through the comments in the source before trying to make sense of this diagram.
//! configured in different roles. This doesn't include ensuring that the
//! replicated writes are durable, or snapshotting partitions in the write
//! buffer. Have a read through the comments in the source before trying to make
//! sense of this diagram.
//!
//! Each level of servers exists to serve a specific function, ideally isolating the
//! kinds of failures that would cause one portion to go down.
//! Each level of servers exists to serve a specific function, ideally isolating
//! the kinds of failures that would cause one portion to go down.
//!
//! The router level simply converts the line protocol to the flatbuffer format and
//! computes the partition key. It keeps no state.
//! The router level simply converts the line protocol to the flatbuffer format
//! and computes the partition key. It keeps no state.
//!
//! The HostGroup/AZ level is for receiving the replicated writes and keeping multiple
//! copies of those writes in memory before they are persisted to object storage. Individual
//! databases or groups of databases can be routed to the same set of host groups, which
//! will limit the blast radius for databases that overload the system with writes or
//! for situations where subscribers lag too far behind.
//! The HostGroup/AZ level is for receiving the replicated writes and keeping
//! multiple copies of those writes in memory before they are persisted to
//! object storage. Individual databases or groups of databases can be routed to
//! the same set of host groups, which will limit the blast radius for databases
//! that overload the system with writes or for situations where subscribers lag
//! too far behind.
//!
//! The Subscriber level is for actually pulling in the data and making it available for
//! query through indexing in the write buffer or writing that data out to Parquet in object
//! storage. Subscribers can also be used for real-time alerting and monitoring.
//! The Subscriber level is for actually pulling in the data and making it
//! available for query through indexing in the write buffer or writing that
//! data out to Parquet in object storage. Subscribers can also be used for
//! real-time alerting and monitoring.
//!
//! ```text
//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

View File

@ -63,9 +63,9 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// `Server` is the container struct for how servers store data internally, as well as how they
/// communicate with other servers. Each server will have one of these structs, which keeps track
/// of all replication and query rules.
/// `Server` is the container struct for how servers store data internally, as
/// well as how they communicate with other servers. Each server will have one
/// of these structs, which keeps track of all replication and query rules.
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
config: Config,
@ -90,7 +90,8 @@ impl<M: ConnectionManager> Server<M> {
}
}
/// sets the id of the server, which is used for replication and the base path in object storage
/// sets the id of the server, which is used for replication and the base
/// path in object storage
pub fn set_id(&mut self, id: u32) {
self.config.id = Some(id);
}
@ -99,8 +100,8 @@ impl<M: ConnectionManager> Server<M> {
self.config.id.context(IdNotSet)
}
/// Tells the server the set of rules for a database. Currently, this is not persisted and
/// is for in-memory processing rules only.
/// Tells the server the set of rules for a database. Currently, this is not
/// persisted and is for in-memory processing rules only.
pub async fn create_database(
&mut self,
db_name: impl Into<String>,
@ -130,9 +131,9 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Creates a host group with a set of connection strings to hosts. These host connection
/// strings should be something that the connection manager can use to return a remote server
/// to work with.
/// Creates a host group with a set of connection strings to hosts. These
/// host connection strings should be something that the connection
/// manager can use to return a remote server to work with.
pub async fn create_host_group(&mut self, id: HostGroupId, hosts: Vec<String>) -> Result<()> {
// Return an error if this server hasn't yet been setup with an id
self.require_id()?;
@ -144,8 +145,9 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Saves the configuration of database rules and host groups to a single JSON file in
/// the configured store under a directory /<writer ID/config.json
/// Saves the configuration of database rules and host groups to a single
/// JSON file in the configured store under a directory /<writer
/// ID/config.json
pub async fn store_configuration(&self) -> Result<()> {
let id = self.require_id()?;
@ -166,8 +168,8 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Loads the configuration for this server from the configured store. This replaces
/// any in-memory configuration that might already be set.
/// Loads the configuration for this server from the configured store. This
/// replaces any in-memory configuration that might already be set.
pub async fn load_configuration(&mut self, id: u32) -> Result<()> {
let location = config_location(id);
@ -187,9 +189,10 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// `write_lines` takes in raw line protocol and converts it to a `ReplicatedWrite`, which
/// is then replicated to other servers based on the configuration of the `db`.
/// This is step #1 from the crate level documentation.
/// `write_lines` takes in raw line protocol and converts it to a
/// `ReplicatedWrite`, which is then replicated to other servers based
/// on the configuration of the `db`. This is step #1 from the crate
/// level documentation.
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
let id = self.require_id()?;
@ -260,9 +263,9 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
// replicates to a single host in the group based on hashing rules. If that host is unavailable
// an error will be returned. The request may still succeed if enough of the other host groups
// have returned a success.
// replicates to a single host in the group based on hashing rules. If that host
// is unavailable an error will be returned. The request may still succeed
// if enough of the other host groups have returned a success.
async fn replicate_to_host_group(
&self,
host_group_id: &str,
@ -275,8 +278,8 @@ impl<M: ConnectionManager> Server<M> {
.get(host_group_id)
.context(HostGroupNotFound { id: host_group_id })?;
// TODO: handle hashing rules to determine which host in the group should get the write.
// for now, just write to the first one.
// TODO: handle hashing rules to determine which host in the group should get
// the write. for now, just write to the first one.
let host = group
.hosts
.get(0)
@ -299,9 +302,9 @@ impl<M: ConnectionManager> Server<M> {
}
}
/// The `Server` will ask the `ConnectionManager` for connections to a specific remote server.
/// These connections can be used to communicate with other servers.
/// This is implemented as a trait for dependency injection in testing.
/// The `Server` will ask the `ConnectionManager` for connections to a specific
/// remote server. These connections can be used to communicate with other
/// servers. This is implemented as a trait for dependency injection in testing.
#[async_trait]
pub trait ConnectionManager {
type Error: std::error::Error + Send + Sync + 'static;
@ -311,12 +314,14 @@ pub trait ConnectionManager {
async fn remote_server(&self, connect: &str) -> Result<Arc<Self::RemoteServer>, Self::Error>;
}
/// The `RemoteServer` represents the API for replicating, subscribing, and querying other servers.
/// The `RemoteServer` represents the API for replicating, subscribing, and
/// querying other servers.
#[async_trait]
pub trait RemoteServer {
type Error: std::error::Error + Send + Sync + 'static;
/// Sends a replicated write to a remote server. This is step #2 from the diagram.
/// Sends a replicated write to a remote server. This is step #2 from the
/// diagram.
async fn replicate(
&self,
db: &str,

View File

@ -1,5 +1,5 @@
//! This module contains code for snapshotting a database partition to Parquet files in object
//! storage.
//! This module contains code for snapshotting a database partition to Parquet
//! files in object storage.
use arrow_deps::{
arrow::record_batch::RecordBatch,
parquet::{self, arrow::ArrowWriter, file::writer::TryClone},
@ -281,7 +281,8 @@ struct MemWriter {
}
impl MemWriter {
/// Returns the inner buffer as long as there are no other references to the Arc.
/// Returns the inner buffer as long as there are no other references to the
/// Arc.
pub fn into_inner(self) -> Option<Vec<u8>> {
Arc::try_unwrap(self.mem)
.ok()

View File

@ -1,4 +1,5 @@
//! This program prints what available x86 features are available on this processor
//! This program prints what available x86 features are available on this
//! processor
macro_rules! check_feature {
($name: tt) => {

View File

@ -20,8 +20,8 @@ pub enum OrgBucketMappingError {
/// Map an InfluxDB 2.X org & bucket into an IOx DatabaseName.
///
/// This function ensures the mapping is unambiguous by requiring both `org` and
/// `bucket` to not contain the `_` character in addition to the [`DatabaseName`]
/// validation.
/// `bucket` to not contain the `_` character in addition to the
/// [`DatabaseName`] validation.
pub(crate) fn org_and_bucket_to_database<'a, O: AsRef<str>, B: AsRef<str>>(
org: O,
bucket: B,

View File

@ -278,7 +278,8 @@ struct ReadInfo {
sql_query: String,
}
// TODO: figure out how to stream read results out rather than rendering the whole thing in mem
// TODO: figure out how to stream read results out rather than rendering the
// whole thing in mem
#[tracing::instrument(level = "debug")]
async fn read<T: DatabaseStore>(
req: hyper::Request<Body>,
@ -396,7 +397,8 @@ async fn snapshot_partition<T: DatabaseStore>(
let db_name =
org_and_bucket_to_database(&snapshot.org, &snapshot.bucket).context(BucketMappingError)?;
// TODO: refactor the rest of this out of the http route and into the server crate.
// TODO: refactor the rest of this out of the http route and into the server
// crate.
let db = server
.write_buffer
.db(&db_name)

View File

@ -1,5 +1,6 @@
//! This module has logic to translate gRPC structures into the native
//! storage system form by extending the builders for those structures with new traits
//! storage system form by extending the builders for those structures with new
//! traits
//!
//! RPCPredicate --> query::Predicates
//!
@ -133,17 +134,21 @@ impl AddRPCNode for PredicateBuilder {
/// Adds the predicates represented by the Node (predicate tree)
/// into predicates that can be evaluted by the storage system
///
/// RPC predicates can have several different types of 'predicate' embedded in them.
/// RPC predicates can have several different types of 'predicate' embedded
/// in them.
///
/// Predicates on tag value (where a tag is a column)
///
/// Predicates on field value (where field is also a column)
///
/// Predicates on 'measurement name' (encoded as tag_ref=\x00), aka select from a particular table
/// Predicates on 'measurement name' (encoded as tag_ref=\x00), aka select
/// from a particular table
///
/// Predicates on 'field name' (encoded as tag_ref=\xff), aka select only specific fields
/// Predicates on 'field name' (encoded as tag_ref=\xff), aka select only
/// specific fields
///
/// This code pulls apart the predicates, if any, into a StoragePredicate that breaks the predicate apart
/// This code pulls apart the predicates, if any, into a StoragePredicate
/// that breaks the predicate apart
fn rpc_predicate(self, rpc_predicate: Option<RPCPredicate>) -> Result<Self> {
match rpc_predicate {
// no input predicate, is fine
@ -183,7 +188,6 @@ impl AddRPCNode for PredicateBuilder {
/// ```
/// child
/// ```
///
fn normalize_node(node: RPCNode) -> Result<RPCNode> {
let RPCNode {
node_type,
@ -213,7 +217,8 @@ fn normalize_node(node: RPCNode) -> Result<RPCNode> {
}
}
/// Converts the node and updates the `StoragePredicate` being built, as appropriate
/// Converts the node and updates the `StoragePredicate` being built, as
/// appropriate
///
/// It recognizes special predicate patterns and pulls them into
/// the fields on `StoragePredicate` for special processing. If no
@ -257,8 +262,8 @@ fn flatten_ands(node: RPCNode, mut dst: Vec<RPCNode>) -> Result<Vec<RPCNode>> {
// Represents a predicate like <expr> IN (option1, option2, option3, ....)
//
// use `try_from_node1 to convert a tree like as ((expr = option1) OR (expr = option2)) or (expr = option3)) ...
// into such a form
// use `try_from_node1 to convert a tree like as ((expr = option1) OR (expr =
// option2)) or (expr = option3)) ... into such a form
#[derive(Debug)]
struct InList {
lhs: RPCNode,
@ -268,8 +273,8 @@ struct InList {
impl TryFrom<&RPCNode> for InList {
type Error = &'static str;
/// If node represents an OR tree like (expr = option1) OR (expr=option2)... extracts
/// an InList like expr IN (option1, option2)
/// If node represents an OR tree like (expr = option1) OR (expr=option2)...
/// extracts an InList like expr IN (option1, option2)
fn try_from(node: &RPCNode) -> Result<Self, &'static str> {
InListBuilder::default().append(node)?.build()
}
@ -646,12 +651,12 @@ impl<'a> Loggable<'a> for RPCPredicate {
}
}
/// Returns a struct that can format gRPC predicate (aka `RPCPredicates`) for Display
/// Returns a struct that can format gRPC predicate (aka `RPCPredicates`) for
/// Display
///
/// For example:
/// let pred = RPCPredicate (...);
/// println!("The predicate is {:?}", loggable_predicate(pred));
///
pub fn displayable_predicate(pred: Option<&RPCPredicate>) -> impl fmt::Display + '_ {
struct Wrapper<'a>(Option<&'a RPCPredicate>);
@ -941,7 +946,8 @@ mod tests {
#[test]
fn test_convert_predicate_field_selection_wrapped() {
// test wrapping the whole predicate in a None value (aka what influxql does for some reason
// test wrapping the whole predicate in a None value (aka what influxql does for
// some reason
let field_selection = make_field_ref_node("field1");
let wrapped = RPCNode {
node_type: RPCNodeType::ParenExpression as i32,

View File

@ -9,9 +9,10 @@ use query::id::Id;
use std::convert::TryInto;
/// This trait implements extraction of information from all storage gRPC requests. The only method
/// required to implement is `read_source_field` because for some requests the field is named
/// `read_source` and for others it is `tags_source`.
/// This trait implements extraction of information from all storage gRPC
/// requests. The only method required to implement is `read_source_field`
/// because for some requests the field is named `read_source` and for others it
/// is `tags_source`.
pub trait GrpcInputs {
fn read_source_field(&self) -> Option<&prost_types::Any>;

View File

@ -179,7 +179,8 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for tonic::Status {
/// Converts a result from the business logic into the appropriate tonic status
/// Converts a result from the business logic into the appropriate tonic
/// status
fn from(err: Error) -> Self {
error!("Error handling gRPC request: {}", err);
err.to_status()
@ -187,7 +188,8 @@ impl From<Error> for tonic::Status {
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic status
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
match &self {
Self::ServerError { .. } => Status::internal(self.to_string()),
@ -476,7 +478,8 @@ where
let measurement = None;
// Special case a request for 'tag_key=_measurement" means to list all measurements
// Special case a request for 'tag_key=_measurement" means to list all
// measurements
let response = if tag_key.is_measurement() {
info!(
"tag_values with tag_key=[x00] (measurement name) for database {}, range: {:?}, predicate: {} --> returning measurement_names",
@ -848,7 +851,8 @@ where
Ok(StringValuesResponse { values })
}
/// Return tag keys with optional measurement, timestamp and arbitratry predicates
/// Return tag keys with optional measurement, timestamp and arbitratry
/// predicates
async fn tag_keys_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
@ -902,7 +906,8 @@ where
Ok(StringValuesResponse { values })
}
/// Return tag values for tag_name, with optional measurement, timestamp and arbitratry predicates
/// Return tag values for tag_name, with optional measurement, timestamp and
/// arbitratry predicates
async fn tag_values_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
@ -1106,7 +1111,8 @@ where
Ok(())
}
/// Return field names, restricted via optional measurement, timestamp and predicate
/// Return field names, restricted via optional measurement, timestamp and
/// predicate
async fn field_names_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
@ -1352,7 +1358,8 @@ mod tests {
predicate: None,
};
// Note we don't set the column_names on the test database, so we expect an error
// Note we don't set the column_names on the test database, so we expect an
// error
let response = fixture.storage_client.tag_keys(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
@ -1372,9 +1379,9 @@ mod tests {
Ok(())
}
/// test the plumbing of the RPC layer for measurement_tag_keys-- specifically that
/// the right parameters are passed into the Database interface
/// and that the returned values are sent back via gRPC.
/// test the plumbing of the RPC layer for measurement_tag_keys--
/// specifically that the right parameters are passed into the Database
/// interface and that the returned values are sent back via gRPC.
#[tokio::test]
async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> {
// Start a test gRPC server on a randomally allocated port
@ -1446,7 +1453,8 @@ mod tests {
predicate: None,
};
// Note we don't set the column_names on the test database, so we expect an error
// Note we don't set the column_names on the test database, so we expect an
// error
let response = fixture.storage_client.measurement_tag_keys(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
@ -1577,7 +1585,8 @@ mod tests {
tag_key: "the_tag_key".into(),
};
// Note we don't set the column_names on the test database, so we expect an error
// Note we don't set the column_names on the test database, so we expect an
// error
let response = fixture.storage_client.tag_values(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
@ -1617,9 +1626,9 @@ mod tests {
);
}
/// test the plumbing of the RPC layer for measurement_tag_values-- specifically that
/// the right parameters are passed into the Database interface
/// and that the returned values are sent back via gRPC.
/// test the plumbing of the RPC layer for measurement_tag_values--
/// specifically that the right parameters are passed into the Database
/// interface and that the returned values are sent back via gRPC.
#[tokio::test]
async fn test_storage_rpc_measurement_tag_values() {
// Start a test gRPC server on a randomally allocated port
@ -1684,7 +1693,8 @@ mod tests {
tag_key: "the_tag_key".into(),
};
// Note we don't set the column_names on the test database, so we expect an error
// Note we don't set the column_names on the test database, so we expect an
// error
let response = fixture.storage_client.measurement_tag_values(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
@ -2261,7 +2271,8 @@ mod tests {
struct OrgAndBucket {
org_id: u64,
bucket_id: u64,
/// The influxdb_iox database name corresponding to `org_id` and `bucket_id`
/// The influxdb_iox database name corresponding to `org_id` and
/// `bucket_id`
db_name: DatabaseName<'static>,
}
@ -2512,8 +2523,8 @@ mod tests {
Ok(responses)
}
/// Convert the StringValueResponses into rust Strings, sorting the values
/// to ensure consistency.
/// Convert the StringValueResponses into rust Strings, sorting the
/// values to ensure consistency.
fn to_string_vec(&self, responses: Vec<StringValuesResponse>) -> Vec<String> {
let mut strings = responses
.into_iter()

View File

@ -14,8 +14,9 @@ pub mod tracing;
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
/// A test helper function for asserting floating point numbers are within the machine epsilon
/// because strict comparison of floating point numbers is incorrect
/// A test helper function for asserting floating point numbers are within the
/// machine epsilon because strict comparison of floating point numbers is
/// incorrect
pub fn approximately_equal(f1: f64, f2: f64) -> bool {
(f1 - f2).abs() < f64::EPSILON
}

View File

@ -117,7 +117,8 @@ fn convert_tsm_good_input_filename() {
// // .tempfile()
// // .expect("error creating temp file")
// // .into_temp_path();
// // let parquet_filename_string = parquet_path.to_string_lossy().to_string();
// // let parquet_filename_string =
// parquet_path.to_string_lossy().to_string();
// let assert = cmd
// .arg("convert")
@ -125,8 +126,8 @@ fn convert_tsm_good_input_filename() {
// .arg(&parquet_path)
// .assert();
// // TODO this should succeed when TSM -> parquet conversion is implemented.
// // assert
// // TODO this should succeed when TSM -> parquet conversion is
// implemented. // assert
// // .failure()
// // .code(1)
// // .stderr(predicate::str::contains("Conversion failed"))

View File

@ -1,12 +1,14 @@
// The test in this file runs the server in a separate thread and makes HTTP requests as a smoke
// test for the integration of the whole system.
// The test in this file runs the server in a separate thread and makes HTTP
// requests as a smoke test for the integration of the whole system.
//
// As written, only one test of this style can run at a time. Add more data to the existing test to
// test more scenarios rather than adding more tests in the same style.
// As written, only one test of this style can run at a time. Add more data to
// the existing test to test more scenarios rather than adding more tests in the
// same style.
//
// Or, change the way this test behaves to create isolated instances by:
//
// - Finding an unused port for the server to run on and using that port in the URL
// - Finding an unused port for the server to run on and using that port in the
// URL
// - Creating a temporary directory for an isolated database path
//
// Or, change the tests to use one server and isolate through `org_id` by:
@ -105,8 +107,8 @@ async fn read_and_write_data() -> Result<()> {
.try_into()
.expect("Unable to represent system time");
// TODO: make a more extensible way to manage data for tests, such as in external fixture
// files or with factories.
// TODO: make a more extensible way to manage data for tests, such as in
// external fixture files or with factories.
let points = vec![
influxdb2_client::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
@ -189,7 +191,8 @@ async fn read_and_write_data() -> Result<()> {
text, expected_read_data
);
// Make an invalid organization WAL dir to test that the server ignores it instead of crashing
// Make an invalid organization WAL dir to test that the server ignores it
// instead of crashing
let invalid_org_dir = server.dir.path().join("not-an-org-id");
fs::create_dir(invalid_org_dir)?;
@ -670,7 +673,8 @@ async fn test_read_group_none_agg_with_predicate(
);
}
/// Create a predicate representing tag_name=tag_value in the horrible gRPC structs
/// Create a predicate representing tag_name=tag_value in the horrible gRPC
/// structs
fn make_tag_predicate(tag_name: impl Into<String>, tag_value: impl Into<String>) -> Predicate {
Predicate {
root: Some(Node {

View File

@ -12,7 +12,8 @@
//!
//! Work remaining:
//!
//! - More testing for correctness; the existing tests mostly demonstrate possible usages.
//! - More testing for correctness; the existing tests mostly demonstrate
//! possible usages.
//! - Error handling
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
@ -154,12 +155,13 @@ impl WalBuilder {
}
}
/// Set the size (in bytes) of each WAL file that should prompt a file rollover when it is
/// exceeded.
/// Set the size (in bytes) of each WAL file that should prompt a file
/// rollover when it is exceeded.
///
/// File rollover happens per sync batch. If the file is underneath this file size limit at the
/// start of a sync operation, the entire sync batch will be written to that file even if
/// some of the entries in the batch cause the file to exceed the file size limit.
/// File rollover happens per sync batch. If the file is underneath this
/// file size limit at the start of a sync operation, the entire sync
/// batch will be written to that file even if some of the entries in
/// the batch cause the file to exceed the file size limit.
///
/// See [WalBuilder::DEFAULT_FILE_ROLLOVER_SIZE_BYTES]
pub fn file_rollover_size(mut self, file_rollover_size: u64) -> Self {
@ -181,8 +183,9 @@ impl WalBuilder {
/// Consume the builder to get an iterator of all entries in this
/// WAL that have been persisted to disk.
///
/// Sequence numbers on the entries will be in increasing order, but if files have been
/// modified or deleted since getting this iterator, there may be gaps in the sequence.
/// Sequence numbers on the entries will be in increasing order, but if
/// files have been modified or deleted since getting this iterator,
/// there may be gaps in the sequence.
///
/// # Asynchronous considerations
///
@ -254,17 +257,17 @@ impl Wal {
})
}
/// A path to a file for storing arbitrary metadata about this WAL, guaranteed not to collide
/// with the data files.
/// A path to a file for storing arbitrary metadata about this WAL,
/// guaranteed not to collide with the data files.
pub fn metadata_path(&self) -> PathBuf {
self.files.root.join("metadata")
}
/// Appends a WritePayload to the active segment file in the WAL and returns its
/// assigned sequence number.
/// Appends a WritePayload to the active segment file in the WAL and returns
/// its assigned sequence number.
///
/// To ensure the data is written to disk, `sync_all` should be called after a
/// single or batch of append operations.
/// To ensure the data is written to disk, `sync_all` should be called after
/// a single or batch of append operations.
pub fn append(&mut self, payload: WritePayload) -> Result<SequenceNumber> {
let sequence_number = self.sequence_number;
@ -289,14 +292,15 @@ impl Wal {
Ok(sequence_number)
}
/// Total size, in bytes, of all the data in all the files in the WAL. If files are deleted
/// from disk without deleting them through the WAL, the size won't reflect that deletion
/// until the WAL is recreated.
/// Total size, in bytes, of all the data in all the files in the WAL. If
/// files are deleted from disk without deleting them through the WAL,
/// the size won't reflect that deletion until the WAL is recreated.
pub fn total_size(&self) -> u64 {
self.total_size
}
/// Deletes files up to, but not including, the file that contains the entry number specified
/// Deletes files up to, but not including, the file that contains the entry
/// number specified
pub fn delete_up_to_entry(&self, entry_number: u64) -> Result<()> {
let mut iter = self.files.existing_filenames()?.peekable();
let hypothetical_filename = self
@ -315,8 +319,8 @@ impl Wal {
Ok(())
}
/// Flush all pending bytes in the active segment file to disk and closes it if it is over
/// the file rollover size.
/// Flush all pending bytes in the active segment file to disk and closes it
/// if it is over the file rollover size.
pub fn sync_all(&mut self) -> Result<()> {
let f = self.active_file.take();

View File

@ -21,16 +21,17 @@ fn delete_up_to() -> Result {
]
);
// Write one WAL entry, and because the existing file is over the size limit, this entry
// should end up in a new WAL file
// Write one WAL entry, and because the existing file is over the size limit,
// this entry should end up in a new WAL file
create_and_sync_batch!(
wal,
["some more data, this should now be rolled over into the next WAL file"]
);
// Write two WAL entries, one that could fit in the existing file but puts the file over the
// limit. Because the two entries are in one sync batch, they both will end up in the existing
// file even though it's over the limit after the first entry.
// Write two WAL entries, one that could fit in the existing file but puts the
// file over the limit. Because the two entries are in one sync batch, they
// both will end up in the existing file even though it's over the limit
// after the first entry.
create_and_sync_batch!(
wal,
[
@ -76,8 +77,8 @@ fn delete_up_to() -> Result {
let wal_entries = all_entries(&builder)?;
assert_eq!(4, wal_entries.len());
// 2 is still readable, because we asked to delete it but couldn't because it was in a file
// with 3.
// 2 is still readable, because we asked to delete it but couldn't because it
// was in a file with 3.
assert_entry!(
wal_entries[0],
2,

View File

@ -33,8 +33,8 @@ fn file_rollover() -> Result {
assert_eq!(1, wal_entries.len());
assert_entry!(wal_entries[0], 0, b"some data within the file limit");
// Write one WAL entry when there is an existing WAL file that is currently under the size
// limit, should end up in the same WAL file
// Write one WAL entry when there is an existing WAL file that is currently
// under the size limit, should end up in the same WAL file
create_and_sync_batch!(wal, ["some more data that puts the file over the limit"]);
// There should still be one existing WAL file
@ -50,8 +50,8 @@ fn file_rollover() -> Result {
b"some more data that puts the file over the limit",
);
// Write one WAL entry, and because the existing file is over the size limit, this entry
// should end up in a new WAL file
// Write one WAL entry, and because the existing file is over the size limit,
// this entry should end up in a new WAL file
create_and_sync_batch!(
wal,
["some more data, this should now be rolled over into the next WAL file"]
@ -75,9 +75,10 @@ fn file_rollover() -> Result {
b"some more data, this should now be rolled over into the next WAL file"
);
// Write two WAL entries, one that could fit in the existing file but puts the file over the
// limit. Because the two entries are in one sync batch, they both will end up in the existing
// file even though it's over the limit after the first entry.
// Write two WAL entries, one that could fit in the existing file but puts the
// file over the limit. Because the two entries are in one sync batch, they
// both will end up in the existing file even though it's over the limit
// after the first entry.
create_and_sync_batch!(
wal,
[

View File

@ -10,11 +10,13 @@ use helpers::Result;
fn total_size() -> Result {
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test how rollover interacts with total size
// Set the file rollover size limit low to test how rollover interacts with
// total size
let builder = WalBuilder::new(dir.as_ref()).file_rollover_size(100);
let mut wal = builder.clone().wal()?;
// Should start without existing WAL files; this implies total file size on disk is 0
// Should start without existing WAL files; this implies total file size on disk
// is 0
let wal_files = helpers::wal_file_names(&dir.as_ref());
assert!(wal_files.is_empty());
@ -32,8 +34,8 @@ fn total_size() -> Result {
// Total size should be that of all the files
assert_eq!(wal.total_size(), helpers::total_size_on_disk(&dir.as_ref()));
// Write one WAL entry, and because the existing file is over the size limit, this entry
// should end up in a new WAL file
// Write one WAL entry, and because the existing file is over the size limit,
// this entry should end up in a new WAL file
create_and_sync_batch!(
wal,
["some more data, this should now be rolled over into the next WAL file"]
@ -54,7 +56,8 @@ fn total_size() -> Result {
// Pretend the process restarts
let wal = builder.wal()?;
// Total size should be that of all the files, so without the file deleted out-of-band
// Total size should be that of all the files, so without the file deleted
// out-of-band
assert_eq!(wal.total_size(), helpers::total_size_on_disk(&dir.as_ref()));
Ok(())

View File

@ -25,7 +25,8 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
/// Stores the actual data for columns in a partition along with summary statistics
/// Stores the actual data for columns in a partition along with summary
/// statistics
pub enum Column {
F64(Vec<Option<f64>>, Statistics<f64>),
I64(Vec<Option<i64>>, Statistics<i64>),
@ -185,8 +186,9 @@ impl Column {
}
}
// push_none_if_len_equal will add a None value to the end of the Vec of values if the
// length is equal to the passed in value. This is used to ensure columns are all the same length.
// push_none_if_len_equal will add a None value to the end of the Vec of values
// if the length is equal to the passed in value. This is used to ensure
// columns are all the same length.
pub fn push_none_if_len_equal(&mut self, len: usize) {
match self {
Self::F64(v, _) => {

View File

@ -383,7 +383,8 @@ impl TSDatabase for Db {
type Error = Error;
// TODO: writes lines creates a column named "time" for the timestamp data. If
// we keep this we need to validate that no tag or field has the same name.
// we keep this we need to validate that no tag or field has the same
// name.
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
let data = split_lines_into_write_entry_partitions(partition_key, lines);
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&data);
@ -411,8 +412,9 @@ impl TSDatabase for Db {
};
if let Some(wal) = &self.wal_details {
// TODO(paul): refactor this so we're not cloning. Although replicated writes shouldn't
// be using a WAL and how the WAL is used at all is likely to have a larger refactor soon.
// TODO(paul): refactor this so we're not cloning. Although replicated writes
// shouldn't be using a WAL and how the WAL is used at all is
// likely to have a larger refactor soon.
wal.write_and_sync(write.data.clone())
.await
.context(WritingWal {
@ -473,7 +475,8 @@ impl TSDatabase for Db {
}
}
/// return all field names in this database, while applying optional predicates
/// return all field names in this database, while applying optional
/// predicates
async fn field_column_names(&self, predicate: Predicate) -> Result<FieldListPlan, Self::Error> {
let mut filter = PartitionTableFilter::new(predicate);
let mut visitor = TableFieldPredVisitor::new();
@ -481,7 +484,8 @@ impl TSDatabase for Db {
Ok(visitor.into_fieldlist_plan())
}
/// return all column values in this database, while applying optional predicates
/// return all column values in this database, while applying optional
/// predicates
async fn column_values(
&self,
column_name: &str,
@ -732,7 +736,8 @@ impl Db {
/// functions, in order, of `visitor`, as described on the Visitor
/// trait.
///
/// Skips visiting any table or columns of `filter.should_visit_table` returns false
/// Skips visiting any table or columns of `filter.should_visit_table`
/// returns false
async fn visit_tables<V: Visitor>(
&self,
filter: &mut PartitionTableFilter,
@ -1209,9 +1214,9 @@ impl Visitor for WindowGroupsVisitor {
}
}
// partition_key returns the partition key for the given line. The key will be the prefix of a
// partition name (multiple partitions can exist for each key). It uses the user defined
// partitioning rules to construct this key
// partition_key returns the partition key for the given line. The key will be
// the prefix of a partition name (multiple partitions can exist for each key).
// It uses the user defined partitioning rules to construct this key
pub fn partition_key(line: &ParsedLine<'_>) -> String {
// TODO - wire this up to use partitioning rules, for now just partition by day
let ts = line.timestamp.unwrap();
@ -1256,9 +1261,10 @@ mod tests {
v.iter().map(|s| s.to_string()).collect::<BTreeSet<_>>()
}
// Abstract a bit of boilerplate around table assertions and improve the failure output.
// The default failure message uses Debug formatting, which prints newlines as `\n`.
// This prints the pretty_format_batches using Display so it's easier to read the tables.
// Abstract a bit of boilerplate around table assertions and improve the failure
// output. The default failure message uses Debug formatting, which prints
// newlines as `\n`. This prints the pretty_format_batches using Display so
// it's easier to read the tables.
fn assert_table_eq(table: &str, partitions: &[arrow::record_batch::RecordBatch]) {
let res = pretty_format_batches(partitions).unwrap();
assert_eq!(table, res, "\n\nleft:\n\n{}\nright:\n\n{}", table, res);

View File

@ -38,14 +38,16 @@ impl Dictionary {
symbol_to_u32(self.0.get_or_intern(value))
}
/// Returns the ID in self.dictionary that corresponds to `value`, if any. Returns an error if
/// no such value is found. Does not add the value to the dictionary.
/// Returns the ID in self.dictionary that corresponds to `value`, if any.
/// Returns an error if no such value is found. Does not add the value
/// to the dictionary.
pub fn lookup_value(&self, value: &str) -> Result<u32> {
self.id(value).context(DictionaryValueLookupError { value })
}
/// Returns the ID in self.dictionary that corresponds to `value`,
/// if any. No error is returned to avoid an allocation when no value is present
/// if any. No error is returned to avoid an allocation when no value is
/// present
pub fn id(&self, value: &str) -> Option<u32> {
self.0.get(value).map(symbol_to_u32)
}

View File

@ -89,9 +89,9 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Partition {
pub key: String,
/// `dictionary` maps &str -> u32. The u32s are used in place of String or str to avoid slow
/// string operations. The same dictionary is used for table names, tag names, tag values, and
/// column names.
/// `dictionary` maps &str -> u32. The u32s are used in place of String or
/// str to avoid slow string operations. The same dictionary is used for
/// table names, tag names, tag values, and column names.
// TODO: intern string field values too?
pub dictionary: Dictionary,
@ -261,7 +261,8 @@ impl Partition {
expr_to_column_names(&expr, &mut predicate_columns).unwrap();
}
// if there are any column references in the expression, ensure they appear in any table
// if there are any column references in the expression, ensure they appear in
// any table
let required_columns = if predicate_columns.is_empty() {
None
} else {
@ -291,7 +292,8 @@ impl Partition {
})
}
/// Adds the ids of any columns in additional_required_columns to the required columns of predicate
/// Adds the ids of any columns in additional_required_columns to the
/// required columns of predicate
pub fn add_required_columns_to_predicate(
&self,
additional_required_columns: &HashSet<String>,
@ -391,7 +393,8 @@ impl Partition {
Ok(table)
}
/// Translate a bunch of strings into a set of ids relative to this partition
/// Translate a bunch of strings into a set of ids relative to this
/// partition
pub fn make_partition_ids<'a, I>(&self, predicate_columns: I) -> PartitionIdSet
where
I: Iterator<Item = &'a String>,

View File

@ -40,8 +40,9 @@ impl WriteBufferDatabases {
}
}
/// wal_dirs will traverse the directories from the service base directory and return
/// the directories that contain WALs for databases, which can be used to restore those DBs.
/// wal_dirs will traverse the directories from the service base directory
/// and return the directories that contain WALs for databases, which
/// can be used to restore those DBs.
pub fn wal_dirs(&self) -> Result<Vec<PathBuf>> {
let entries = fs::read_dir(&self.base_dir).context(ReadError {
dir: &self.base_dir,
@ -99,7 +100,8 @@ impl DatabaseStore for WriteBufferDatabases {
// database doesn't exist yet so acquire the write lock and get or insert
let mut databases = self.databases.write().await;
// make sure it didn't get inserted by someone else while we were waiting for the write lock
// make sure it didn't get inserted by someone else while we were waiting for
// the write lock
if let Some(db) = databases.get(name) {
return Ok(db.clone());
}

View File

@ -176,7 +176,8 @@ pub struct Table {
/// Name of the table as a u32 in the partition dictionary
pub id: u32,
/// Maps column name (as a u32 in the partition dictionary) to an index in self.columns
/// Maps column name (as a u32 in the partition dictionary) to an index in
/// self.columns
pub column_id_to_index: HashMap<u32, usize>,
/// Actual column storage
@ -313,7 +314,8 @@ impl Table {
.column_id_to_index
.iter()
.filter_map(|(&column_id, &column_index)| {
// keep tag columns and the timestamp column, if needed to evaluate a timestamp predicate
// keep tag columns and the timestamp column, if needed to evaluate a timestamp
// predicate
let need_column = if let Column::Tag(_, _) = self.columns[column_index] {
true
} else {
@ -421,7 +423,8 @@ impl Table {
.context(BuildingPlan)
}
/// Creates a SeriesSet plan that produces an output table with rows that match the predicate
/// Creates a SeriesSet plan that produces an output table with rows that
/// match the predicate
///
/// The output looks like:
/// (tag_col1, tag_col2, ... field1, field2, ... timestamp)
@ -439,8 +442,8 @@ impl Table {
self.series_set_plan_impl(partition_predicate, None, partition)
}
/// Creates the plans for computing series set, pulling prefix_columns, if any, as a prefix of the ordering
/// The created plan looks like:
/// Creates the plans for computing series set, pulling prefix_columns, if
/// any, as a prefix of the ordering The created plan looks like:
///
/// Projection (select the columns columns needed)
/// Order by (tag_columns, timestamp_column)
@ -518,10 +521,12 @@ impl Table {
Arc::new(table_name)
}
/// Creates a GroupedSeriesSet plan that produces an output table with rows that match the predicate
/// Creates a GroupedSeriesSet plan that produces an output table with rows
/// that match the predicate
///
/// The output looks like:
/// (group_tag_column1, group_tag_column2, ... tag_col1, tag_col2, ... field1, field2, ... timestamp)
/// (group_tag_column1, group_tag_column2, ... tag_col1, tag_col2, ...
/// field1, field2, ... timestamp)
///
/// The order of the tag_columns is ordered by name.
///
@ -576,7 +581,6 @@ impl Table {
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field)
/// Filter(predicate)
/// InMemoryScan
///
pub fn window_grouped_series_set_plan(
&self,
partition_predicate: &PartitionPredicate,
@ -1319,7 +1323,8 @@ mod tests {
#[tokio::test]
async fn test_series_set_plan_order() {
// test that the columns and rows come out in the right order (tags then timestamp)
// test that the columns and rows come out in the right order (tags then
// timestamp)
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
@ -1684,7 +1689,8 @@ mod tests {
.collect()
}
// returns the error string or panics if `reorder_prefix` doesn't return an error
// returns the error string or panics if `reorder_prefix` doesn't return an
// error
fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String {
let prefix = prefix.iter().map(|s| s.to_string()).collect::<Vec<_>>();
let table_columns =