Merge remote-tracking branch 'origin/main' into er/fix/flux/2691

pull/24376/head
Andrew Lamb 2021-10-25 13:41:08 -04:00
commit 7cd56cbc56
99 changed files with 1294 additions and 998 deletions

34
Cargo.lock generated
View File

@ -283,9 +283,9 @@ dependencies = [
[[package]]
name = "backtrace"
version = "0.3.61"
version = "0.3.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01"
checksum = "091bcdf2da9950f96aa522681ce805e6857f6ca8df73833d35736ab2dc78e152"
dependencies = [
"addr2line",
"cc",
@ -1429,9 +1429,9 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
@ -1458,9 +1458,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.13"
version = "0.14.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593"
checksum = "2b91bb1f221b6ea1f1e4371216b70f40748774c2fb5971b450c07773fb92d26b"
dependencies = [
"bytes",
"futures-channel",
@ -1931,9 +1931,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.104"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013"
[[package]]
name = "libloading"
@ -2035,9 +2035,9 @@ checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "matchers"
version = "0.0.1"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
@ -2219,6 +2219,7 @@ dependencies = [
"bytes",
"criterion",
"flate2",
"hashbrown",
"influxdb_line_protocol",
"mutable_batch",
"schema",
@ -2504,9 +2505,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.26.2"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2"
checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9"
dependencies = [
"memchr",
]
@ -4518,9 +4519,9 @@ dependencies = [
[[package]]
name = "tower"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30"
checksum = "c00e500fff5fa1131c866b246041a6bf96da9c965f8fe4128cb1421f23e93c00"
dependencies = [
"futures-core",
"futures-util",
@ -4658,12 +4659,11 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
checksum = "5cf865b5ddc38e503a29c41c4843e616a73028ae18c637bc3eb2afaef4909c84"
dependencies = [
"ansi_term 0.12.1",
"chrono",
"lazy_static",
"matchers",
"regex",

View File

@ -2,7 +2,7 @@
name = "influxdb_iox"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
default-run = "influxdb_iox"
readme = "README.md"
@ -94,7 +94,7 @@ data_types = { path = "data_types" }
entry = { path = "entry" }
generated_types = { path = "generated_types" }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight", "format"] }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
internal_types = { path = "internal_types" }
iox_object_store = { path = "iox_object_store" }

View File

@ -2,7 +2,7 @@
name = "arrow_util"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Apache Arrow utilities"
[dependencies]

View File

@ -29,6 +29,12 @@ impl BitSet {
bitset
}
/// Reserve space for `count` further bits
pub fn reserve(&mut self, count: usize) {
let new_buf_len = (self.len + count + 7) >> 3;
self.buffer.reserve(new_buf_len);
}
/// Appends `count` unset bits
pub fn append_unset(&mut self, count: usize) {
self.len += count;

View File

@ -3,7 +3,7 @@ name = "client_util"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
description = "Shared code for IOx clients"
edition = "2018"
edition = "2021"
[dependencies]
http = "0.2.3"
@ -13,4 +13,4 @@ tonic = { version = "0.5.0" }
tower = "0.4"
[dev-dependencies]
tokio = { version = "1.11", features = ["macros"] }
tokio = { version = "1.11", features = ["macros"] }

View File

@ -3,7 +3,7 @@ name = "data_types"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients"
edition = "2018"
edition = "2021"
readme = "README.md"
[dependencies] # In alphabetical order

View File

@ -2,7 +2,7 @@
name = "datafusion"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Re-exports datafusion at a specific version"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "datafusion_util"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Datafusion utilities"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "entry"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
description = "The entry format used by the write buffer"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "generated_types"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
bytes = "1.0"

View File

@ -21,38 +21,44 @@ fn main() -> Result<()> {
/// - `com.github.influxdata.idpe.storage.read.rs`
/// - `influxdata.iox.catalog.v1.rs`
/// - `influxdata.iox.management.v1.rs`
/// - `influxdata.iox.router.v1.rs`
/// - `influxdata.iox.write.v1.rs`
/// - `influxdata.platform.storage.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let storage_path = root.join("influxdata/platform/storage");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let catalog_path = root.join("influxdata/iox/catalog/v1");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1");
let router_path = root.join("influxdata/iox/router/v1");
let storage_path = root.join("influxdata/platform/storage");
let write_path = root.join("influxdata/iox/write/v1");
let proto_files = vec![
storage_path.join("test.proto"),
storage_path.join("predicate.proto"),
storage_path.join("storage_common.proto"),
storage_path.join("service.proto"),
storage_path.join("storage_common_idpe.proto"),
idpe_path.join("source.proto"),
catalog_path.join("catalog.proto"),
catalog_path.join("parquet_metadata.proto"),
catalog_path.join("predicate.proto"),
management_path.join("database_rules.proto"),
idpe_path.join("source.proto"),
management_path.join("chunk.proto"),
management_path.join("database_rules.proto"),
management_path.join("jobs.proto"),
management_path.join("partition.proto"),
management_path.join("partition_template.proto"),
management_path.join("server_config.proto"),
management_path.join("service.proto"),
management_path.join("shard.proto"),
management_path.join("jobs.proto"),
write_path.join("service.proto"),
root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"),
root.join("grpc/health/v1/service.proto"),
management_path.join("write_buffer.proto"),
root.join("google/longrunning/operations.proto"),
root.join("google/rpc/error_details.proto"),
root.join("google/rpc/status.proto"),
root.join("grpc/health/v1/service.proto"),
root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"),
router_path.join("router.proto"),
router_path.join("service.proto"),
storage_path.join("predicate.proto"),
storage_path.join("service.proto"),
storage_path.join("storage_common.proto"),
storage_path.join("storage_common_idpe.proto"),
storage_path.join("test.proto"),
write_path.join("service.proto"),
];
// Tell cargo to recompile if any of these proto files are changed

View File

@ -3,34 +3,9 @@ package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "influxdata/iox/management/v1/partition_template.proto";
import "influxdata/iox/management/v1/shard.proto";
// `PartitionTemplate` is used to compute the partition key of each row that
// gets written. It can consist of the table name, a column name and its value,
// a formatted time, or a string column and regex captures of its value. For
// columns that do not appear in the input row, a blank value is output.
//
// The key is constructed in order of the template parts; thus ordering changes
// what partition key is generated.
message PartitionTemplate {
message Part {
message ColumnFormat {
string column = 1;
string format = 2;
}
oneof part {
google.protobuf.Empty table = 1;
string column = 2;
string time = 3;
ColumnFormat regex = 4;
ColumnFormat strf_time = 5;
}
}
repeated Part parts = 1;
}
import "influxdata/iox/management/v1/write_buffer.proto";
message LifecycleRules {
// Once the total amount of buffered data in memory reaches this size start
@ -111,6 +86,9 @@ message LifecycleRules {
uint64 parquet_cache_limit = 17;
}
// Database rules.
//
// TODO(marco): add `WriteSources` to this message.
message DatabaseRules {
// The unencoded name of the database
//
@ -128,6 +106,8 @@ message DatabaseRules {
LifecycleRules lifecycle_rules = 3;
// If not specified, does not configure any routing
//
// TODO(marco): remove this
oneof routing_rules {
// Shard config
ShardConfig shard_config = 8;
@ -146,6 +126,8 @@ message DatabaseRules {
// Optionally, the connection for the write buffer for writing or reading/restoring data.
//
// If not specified, does not configure a write buffer
//
// TODO(marco): remove this
WriteBufferConnection write_buffer_connection = 13;
}
@ -158,61 +140,6 @@ message PersistedDatabaseRules {
DatabaseRules rules = 2;
}
// Configures the use of a write buffer.
message WriteBufferConnection {
enum Direction {
// Unspecified direction, will be treated as an error.
DIRECTION_UNSPECIFIED = 0;
// Writes into the buffer aka "producer".
DIRECTION_WRITE = 1;
// Reads from the buffer aka "consumer".
DIRECTION_READ = 2;
}
// If the buffer is used for reading or writing.
Direction direction = 1;
// Which type should be used (e.g. "kafka", "mock")
string type = 2;
// Connection string, depends on `type`.
string connection = 3;
// Old non-nested auto-creation config.
reserved 4, 5, 7;
// Special configs to be applied when establishing the connection.
//
// This depends on `type` and can configure aspects like timeouts.
map<string, string> connection_config = 6;
// Specifies if the sequencers (e.g. for Kafka in form of a topic w/ `n_sequencers` partitions) should be
// automatically created if they do not existing prior to reading or writing.
WriteBufferCreationConfig creation_config = 8;
}
// Configs sequencer auto-creation for write buffers.
//
// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ `n_sequencers`
// partitions.
message WriteBufferCreationConfig {
// Number of sequencers.
//
// How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of partitions.
//
// If 0, a server-side default is used
uint32 n_sequencers = 1;
// Special configs to by applied when sequencers are created.
//
// This depends on `type` and can setup parameters like retention policy.
//
// Contains 0 or more key value pairs
map<string, string> options = 2;
}
message RoutingConfig {
Sink sink = 2;
}

View File

@ -0,0 +1,31 @@
syntax = "proto3";
package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
import "google/protobuf/empty.proto";
// `PartitionTemplate` is used to compute the partition key of each row that
// gets written. It can consist of the table name, a column name and its value,
// a formatted time, or a string column and regex captures of its value. For
// columns that do not appear in the input row, a blank value is output.
//
// The key is constructed in order of the template parts; thus ordering changes
// what partition key is generated.
message PartitionTemplate {
message Part {
message ColumnFormat {
string column = 1;
string format = 2;
}
oneof part {
google.protobuf.Empty table = 1;
string column = 2;
string time = 3;
ColumnFormat regex = 4;
ColumnFormat strf_time = 5;
}
}
repeated Part parts = 1;
}

View File

@ -37,10 +37,7 @@ service ManagementService {
rpc RestoreDatabase(RestoreDatabaseRequest) returns (RestoreDatabaseResponse);
// List deleted databases and their metadata.
rpc ListDeletedDatabases(ListDeletedDatabasesRequest) returns (ListDeletedDatabasesResponse);
// List all databases and their metadata.
// List databases with their metadata.
rpc ListDetailedDatabases(ListDetailedDatabasesRequest) returns (ListDetailedDatabasesResponse);
// List chunks available on this database
@ -189,12 +186,6 @@ message RestoreDatabaseRequest {
message RestoreDatabaseResponse {}
message ListDeletedDatabasesRequest {}
message ListDeletedDatabasesResponse {
repeated DetailedDatabase deleted_databases = 1;
}
message ListDetailedDatabasesRequest {}
message ListDetailedDatabasesResponse {

View File

@ -30,10 +30,14 @@ message ShardConfig {
/// If set to true the router will ignore any errors sent by the remote
/// targets in this route. That is, the write request will succeed
/// regardless of this route's success.
///
/// TODO(marco): remove this
bool ignore_errors = 3;
/// Mapping between shard IDs and node groups. Other sharding rules use
/// ShardId as targets.
///
/// TODO(marco): remove this
map<uint32, Sink> shards = 4;
}

View File

@ -0,0 +1,58 @@
syntax = "proto3";
package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
// Configures the use of a write buffer.
message WriteBufferConnection {
enum Direction {
// Unspecified direction, will be treated as an error.
DIRECTION_UNSPECIFIED = 0;
// Writes into the buffer aka "producer".
DIRECTION_WRITE = 1;
// Reads from the buffer aka "consumer".
DIRECTION_READ = 2;
}
// If the buffer is used for reading or writing.
Direction direction = 1;
// Which type should be used (e.g. "kafka", "mock")
string type = 2;
// Connection string, depends on `type`.
string connection = 3;
// Old non-nested auto-creation config.
reserved 4, 5, 7;
// Special configs to be applied when establishing the connection.
//
// This depends on `type` and can configure aspects like timeouts.
map<string, string> connection_config = 6;
// Specifies if the sequencers (e.g. for Kafka in form of a topic w/ `n_sequencers` partitions) should be
// automatically created if they do not existing prior to reading or writing.
WriteBufferCreationConfig creation_config = 8;
}
// Configs sequencer auto-creation for write buffers.
//
// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ `n_sequencers`
// partitions.
message WriteBufferCreationConfig {
// Number of sequencers.
//
// How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of partitions.
//
// If 0, a server-side default is used
uint32 n_sequencers = 1;
// Special configs to by applied when sequencers are created.
//
// This depends on `type` and can setup parameters like retention policy.
//
// Contains 0 or more key value pairs
map<string, string> options = 2;
}

View File

@ -0,0 +1,148 @@
syntax = "proto3";
package influxdata.iox.router.v1;
option go_package = "github.com/influxdata/iox/router/v1";
import "influxdata/iox/management/v1/partition_template.proto";
import "influxdata/iox/management/v1/shard.proto";
import "influxdata/iox/management/v1/write_buffer.proto";
// Router for writes and queries.
//
// A router acts similar to a "real" database except that it does NOT store or manage any data by itself but forwards
// this responsiblity to other nodes (which then in turn provide an actual database or another routing layer).
//
// # Write Routing
//
// ## Overall Picture
// Data is accepted from all sources, is sharded, and is (according to the sharding) written into the sink sets. There
// may be a prioritization for sources that is "HTTP and gRPC first, and write buffers in declared order".
//
// ```text
// ( HTTP )--+ +------->( sink set 1 )
// | |
// ( gRPC )--+-->( sharder )--> ...
// | |
// ( Write Buffer 1 )--+ +------->( sink set n )
// ... |
// ( Write Buffer n )--+
// ```
//
// ## Sharder
// A sharder takes data and for every row/line:
//
// 1. Checks if a matcher matches the row, first matcher wins. If that's the case, the row/line is directly sent to the
// sink set.
// 2. If no matcher matches the row/line is handled by the hash ring.
//
// ```text
// --->[ matcher 1? ]-{no}---...--->[ matcher n? ]-{no}---+
// | | |
// {yes} {yes} |
// | | |
// V V |
// ( sink set 1 ) ( sink set n ) |
// ^ ^ |
// | | |
// +--------( hash ring )-------+ |
// ^ |
// | |
// +-----------------------------+
// ```
//
// ## Sink Set
// Data is written to all sinks in the set in implementation-defined order. Errors do NOT short-circuit. If an error
// occurs for at least one sink that has `ignore_errors = false`, an error is returned. An empty sink set acts as NULL
// sink and always succeeds.
//
// **IMPORTANT: Queries are NOT distributed! The are always only answered by a single node.**
//
// # Query Routing
// Queries always arrive via gRPC and are forwarded one sink. The specific sink is selected via an engine that might
// take the following features into account:
//
// - **freshness:** For each sink what are the lasted sequence numbers pulled from the write buffer.
// - **stickyness:** The same client should ideally reach the same sink in subsequent requests to improve caching.
// - **circuit breaking:** If a sink is unhealthy it should be excluded from the candidate list for a while.
//
// ```text
// ( gRPC )-->[ selection engine ]-->( sink 1 )
// | ...
// +---->( sink n )
// ```
message Router {
// Router name.
//
// The name is unique for this node.
string name = 1;
// Sources of write requests.
WriteSources write_sources = 2;
// Write sharder.
//
// NOTE: This only uses the `specific_targets` and `hash_ring` config of the sharder. The other fields are ignored.
//
// TODO(marco): remove the note above once the `ShardConfig` was cleaned up.
influxdata.iox.management.v1.ShardConfig write_sharder = 3;
// Sinks for write requests.
map<uint32, WriteSinkSet> write_sinks = 4;
// Sinks for query requests.
QuerySinks query_sinks = 5;
// Template that generates a partition key for each row inserted into the database.
//
// This is a temporary config until the partition is moved entirely into the database.
//
// If not specified, a server-side default is used
//
// TODO(marco): remove this
influxdata.iox.management.v1.PartitionTemplate partition_template = 6;
}
// Sources of write request aka new data.
//
// Data is accepted from these sources and a status is provided back to it.
message WriteSources {
// If set writes via gRPC and HTTP are accepted.
//
// You may want to disable this when incoming data should solely be received via write buffer(s).
bool allow_unsequenced_inputs = 2;
// Write buffer connections.
repeated influxdata.iox.management.v1.WriteBufferConnection write_buffers = 3;
}
// Sink of write requests aka new data.
//
// Data is sent to this sink and a status is received from it.
message WriteSink {
// Where the data goes.
oneof sink {
// gRPC-based remote, addressed by its server ID.
uint32 grpc_remote = 1;
// Write buffer connection.
influxdata.iox.management.v1.WriteBufferConnection write_buffer = 2;
}
// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure.
bool ignore_errors = 3;
}
// Set of write sinks.
message WriteSinkSet {
// Sinks within the set.
repeated WriteSink sinks = 1;
}
// Sinks for query requests.
//
// Queries are sent to one of these sinks and the resulting data is received from it.
//
// Note that the query results are flowing into the opposite direction (aka a query sink is a result source).
message QuerySinks {
// gRPC-based remotes, addressed by their server IDs.
repeated uint32 grpc_remotes = 1;
}

View File

@ -0,0 +1,76 @@
syntax = "proto3";
package influxdata.iox.router.v1;
option go_package = "github.com/influxdata/iox/router/v1";
import "influxdata/iox/router/v1/router.proto";
service RouterService {
// List remote IOx servers we know about.
rpc ListRemotes(ListRemotesRequest) returns (ListRemotesResponse);
// Update information about a remote IOx server (upsert).
rpc UpdateRemote(UpdateRemoteRequest) returns (UpdateRemoteResponse);
// Delete a reference to remote IOx server.
rpc DeleteRemote(DeleteRemoteRequest) returns (DeleteRemoteResponse);
// List configured routers.
rpc ListRouter(ListRouterRequest) returns (ListRouterResponse);
// Update router config (upsert).
rpc UpdateRouter(UpdateRouterRequest) returns (UpdateRouterResponse);
// Delete router.
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
}
message ListRemotesRequest {}
message ListRemotesResponse {
repeated Remote remotes = 1;
}
// This resource represents a remote IOx server.
message Remote {
// The server ID associated with a remote IOx server.
uint32 id = 1;
// The address of the remote IOx server gRPC endpoint.
string connection_string = 2;
}
// Updates information about a remote IOx server.
//
// If a remote for a given `id` already exists, it is updated in place.
message UpdateRemoteRequest {
// If omitted, the remote associated with `id` will be removed.
Remote remote = 1;
// TODO(#917): add an optional flag to test the connection or not before adding it.
}
message UpdateRemoteResponse {}
message ListRouterRequest {}
message ListRouterResponse {
repeated Router routers = 1;
}
message DeleteRemoteRequest{
uint32 id = 1;
}
message DeleteRemoteResponse {}
message UpdateRouterRequest {
Router router = 1;
}
message UpdateRouterResponse {}
message DeleteRouterRequest {
string router_name = 1;
}
message DeleteRouterResponse {}

View File

@ -2,7 +2,7 @@
name = "grpc-router"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
edition = "2021"
[dependencies]
bytes = "1.0"

View File

@ -2,7 +2,7 @@
name = "grpc-router-test-gen"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
edition = "2021"
description = "Protobuf used in test for the grpc-router crate; need to be in a separate create because of linter limitations"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "influxdb2_client"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
bytes = "1.0"

View File

@ -2,11 +2,11 @@
name = "influxdb_iox_client"
version = "0.1.0"
authors = ["Dom Dwyer <dom@itsallbroken.com>"]
edition = "2018"
edition = "2021"
[features]
flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"]
format = ["arrow"]
format = ["arrow", "arrow_util"]
[dependencies]
# Workspace dependencies, in alphabetical order

View File

@ -602,22 +602,7 @@ impl Client {
Ok(names)
}
/// List deleted databases and metadata
pub async fn list_deleted_databases(
&mut self,
) -> Result<Vec<DetailedDatabase>, ListDatabaseError> {
let response = self
.inner
.list_deleted_databases(ListDeletedDatabasesRequest {})
.await
.map_err(|status| match status.code() {
tonic::Code::Unavailable => ListDatabaseError::Unavailable(status),
_ => ListDatabaseError::ServerError(status),
})?;
Ok(response.into_inner().deleted_databases)
}
/// List all databases and detailed metadata
/// List databases and detailed metadata
pub async fn list_detailed_databases(
&mut self,
) -> Result<Vec<DetailedDatabase>, ListDatabaseError> {

View File

@ -2,7 +2,7 @@
name = "influxdb_line_protocol"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
nom = "7"
@ -11,4 +11,4 @@ snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
test_helpers = { path = "../test_helpers" }

View File

@ -2,7 +2,7 @@
name = "influxdb_storage_client"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
[dependencies]
client_util = { path = "../client_util" }
@ -11,4 +11,4 @@ prost = "0.8"
tonic = { version = "0.5.0" }
futures-util = { version = "0.3.1" }
[dev-dependencies]
[dev-dependencies]

View File

@ -2,7 +2,7 @@
name = "influxdb_tsm"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
integer-encoding = "3.0.2"

View File

@ -2,7 +2,7 @@
name = "internal_types"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md"

View File

@ -2,7 +2,7 @@
name = "iox_data_generator"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
default-run = "iox_data_generator"
[dependencies]
@ -26,7 +26,7 @@ snafu = "0.6.8"
tokio = { version = "1.11", features = ["macros", "rt-multi-thread"] }
toml = "0.5.6"
tracing = "0.1"
tracing-subscriber = "0.2.25"
tracing-subscriber = "0.3.0"
uuid = { version = "0.8.1", default_features = false }
[dev-dependencies]

View File

@ -1,7 +1,7 @@
[package]
name = "iox_object_store"
version = "0.1.0"
edition = "2018"
edition = "2021"
description = "IOx-specific semantics wrapping the general-purpose object store crate"
[dependencies]

View File

@ -114,8 +114,6 @@ impl Generation {
impl IoxObjectStore {
/// Get the data for the server config to determine the names and locations of the databases
/// that this server owns.
// TODO: this is in the process of replacing list_possible_databases for the floating databases
// design
pub async fn get_server_config_file(inner: &ObjectStore, server_id: ServerId) -> Result<Bytes> {
let path = paths::server_config_path(inner, server_id);
let mut stream = inner.get(&path).await?;
@ -155,62 +153,7 @@ impl IoxObjectStore {
RootPath::new(inner, server_id, database_name)
}
/// List database names and their locations in object storage that need to be further checked
/// for generations and whether they're marked as deleted or not.
// TODO: this is in the process of being deprecated in favor of get_server_config_file
pub async fn list_possible_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<(DatabaseName<'static>, String)>> {
let path = paths::all_databases_path(inner, server_id);
let list_result = inner.list_with_delimiter(&path).await?;
Ok(list_result
.common_prefixes
.into_iter()
.filter_map(|prefix| {
let prefix_parsed: DirsAndFileName = prefix.clone().into();
let last = prefix_parsed
.directories
.last()
.expect("path can't be empty");
let db_name = DatabaseName::new(last.encoded().to_string())
.log_if_error("invalid database directory")
.ok()?;
Some((db_name, prefix.to_raw()))
})
.collect())
}
/// List databases marked as deleted in in object storage along with their generation IDs and
/// when they were deleted. Enables a user to choose a generation for a database that they
/// would want to restore or would want to delete permanently.
pub async fn list_deleted_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<DetailedDatabase>> {
Ok(Self::list_all_databases(inner, server_id)
.await?
.into_iter()
.flat_map(|(name, generations)| {
let name = Arc::new(name);
generations.into_iter().filter_map(move |gen| {
let name = Arc::clone(&name);
gen.deleted_at.map(|_| DetailedDatabase {
name: (*name).clone(),
generation_id: gen.id,
deleted_at: gen.deleted_at,
})
})
})
.collect())
}
/// List all databases in in object storage along with their generation IDs and if/when they
/// were deleted. Useful for visibility into object storage and finding databases to restore or
/// permanently delete.
/// List this server's databases in object storage along with their generation IDs.
pub async fn list_detailed_databases(
inner: &ObjectStore,
server_id: ServerId,
@ -234,8 +177,7 @@ impl IoxObjectStore {
/// List database names in object storage along with all existing generations for each database
/// and whether the generations are marked as deleted or not. Useful for finding candidates
/// to restore or to permanently delete. Makes many more calls to object storage than
/// [`IoxObjectStore::list_possible_databases`].
/// to restore or to permanently delete. Makes many calls to object storage.
async fn list_all_databases(
inner: &ObjectStore,
server_id: ServerId,
@ -1143,50 +1085,6 @@ mod tests {
iox_object_store.write_tombstone().await.unwrap();
}
#[tokio::test]
async fn list_possible_databases_returns_all_potential_databases() {
let object_store = make_object_store();
let server_id = make_server_id();
// Create a normal database, will be in the list
let db_normal = DatabaseName::new("db_normal").unwrap();
create_database(Arc::clone(&object_store), server_id, &db_normal).await;
// Create a database, then delete it - will still be in the list
let db_deleted = DatabaseName::new("db_deleted").unwrap();
let db_deleted_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_deleted).await;
delete_database(&db_deleted_iox_store).await;
// Put a file in a directory that looks like a database directory but has no rules,
// will still be in the list
let not_a_db = DatabaseName::new("not_a_db").unwrap();
let mut not_rules_path = object_store.new_path();
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
// Put a file in a directory that's an invalid database name - this WON'T be in the list
let invalid_db_name = ("a".repeat(65)).to_string();
let mut invalid_db_name_rules_path = object_store.new_path();
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
let possible = IoxObjectStore::list_possible_databases(&object_store, server_id)
.await
.unwrap();
let mut names: Vec<_> = possible.into_iter().map(|d| d.0).collect();
names.sort();
assert_eq!(names, vec![db_deleted, db_normal, not_a_db]);
}
#[tokio::test]
async fn list_all_databases_returns_generation_info() {
let object_store = make_object_store();
@ -1298,93 +1196,6 @@ mod tests {
);
}
#[tokio::test]
async fn list_deleted_databases_metadata() {
let object_store = make_object_store();
let server_id = make_server_id();
// Create a normal database, will NOT be in the list of deleted databases
let db_normal = DatabaseName::new("db_normal").unwrap();
create_database(Arc::clone(&object_store), server_id, &db_normal).await;
// Create a database, then delete it - will be in the list once
let db_deleted = DatabaseName::new("db_deleted").unwrap();
let db_deleted_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_deleted).await;
delete_database(&db_deleted_iox_store).await;
// Create, delete, create - will be in the list once
let db_reincarnated = DatabaseName::new("db_reincarnated").unwrap();
let db_reincarnated_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_reincarnated).await;
delete_database(&db_reincarnated_iox_store).await;
create_database(Arc::clone(&object_store), server_id, &db_reincarnated).await;
// Create, delete, create, delete - will be in the list twice
let db_deleted_twice = DatabaseName::new("db_deleted_twice").unwrap();
let db_deleted_twice_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_deleted_twice).await;
delete_database(&db_deleted_twice_iox_store).await;
let db_deleted_twice_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_deleted_twice).await;
delete_database(&db_deleted_twice_iox_store).await;
// Put a file in a directory that looks like a database directory but has no rules,
// won't be in the list because there's no tombstone file
let not_a_db = DatabaseName::new("not_a_db").unwrap();
let mut not_rules_path = object_store.new_path();
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
// Put a file in a directory that's an invalid database name - won't be in the list
let invalid_db_name = ("a".repeat(65)).to_string();
let mut invalid_db_name_rules_path = object_store.new_path();
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
// Put a file in a directory that looks like a database name, but doesn't look like a
// generation directory - won't be in the list
let no_generations = DatabaseName::new("no_generations").unwrap();
let mut no_generations_path = object_store.new_path();
no_generations_path.push_all_dirs(&[
&server_id.to_string(),
no_generations.as_str(),
"not-a-generation",
]);
no_generations_path.set_file_name("not_rules.txt");
object_store
.put(&no_generations_path, Bytes::new())
.await
.unwrap();
let mut deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id)
.await
.unwrap();
deleted_dbs.sort_by_key(|d| (d.name.clone(), d.generation_id));
assert_eq!(deleted_dbs.len(), 4);
assert_eq!(deleted_dbs[0].name, db_deleted);
assert_eq!(deleted_dbs[0].generation_id.inner, 0);
assert_eq!(deleted_dbs[1].name, db_deleted_twice);
assert_eq!(deleted_dbs[1].generation_id.inner, 0);
assert_eq!(deleted_dbs[2].name, db_deleted_twice);
assert_eq!(deleted_dbs[2].generation_id.inner, 1);
assert_eq!(deleted_dbs[3].name, db_reincarnated);
assert_eq!(deleted_dbs[3].generation_id.inner, 0);
}
async fn restore_database(
object_store: Arc<ObjectStore>,
server_id: ServerId,
@ -1417,11 +1228,12 @@ mod tests {
delete_database(&db_iox_store).await;
// Get one generation ID from the list of deleted databases
let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id)
let listed_dbs = IoxObjectStore::list_detailed_databases(&object_store, server_id)
.await
.unwrap();
assert_eq!(deleted_dbs.len(), 2);
let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap();
assert_eq!(listed_dbs.len(), 2);
let deleted_generations: Vec<_> = listed_dbs.iter().filter(|d| d.name == db).collect();
let deleted_db = deleted_generations[0];
// Restore the generation
restore_database(
@ -1439,14 +1251,8 @@ mod tests {
.unwrap();
assert_eq!(all_dbs.len(), 1);
// The other deleted generation should be the only item in the deleted list
let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id)
.await
.unwrap();
assert_eq!(deleted_dbs.len(), 1);
// Try to restore the other deleted database
let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap();
let deleted_db = deleted_generations[1];
let err = restore_database(
Arc::clone(&object_store),
server_id,

View File

@ -2,7 +2,7 @@
name = "lifecycle"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
description = "Implements the IOx data lifecycle"
[dependencies]

View File

@ -3,13 +3,14 @@ name = "logfmt"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
description = "tracing_subscriber layer for writing out logfmt formatted events"
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
observability_deps = { path = "../observability_deps" }
tracing-subscriber = "0.2"
tracing-subscriber = "0.3"
[dev-dependencies] # In alphabetical order
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
regex = "1.4.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -19,12 +19,18 @@ use tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::LookupSpan,
/// looked very small and did not (obviously) work with the tracing subscriber
///
/// [logfmt]: https://brandur.org/logfmt
pub struct LogFmtLayer<W: MakeWriter> {
pub struct LogFmtLayer<W>
where
W: for<'writer> MakeWriter<'writer>,
{
writer: W,
display_target: bool,
}
impl<W: MakeWriter> LogFmtLayer<W> {
impl<W> LogFmtLayer<W>
where
W: for<'writer> MakeWriter<'writer>,
{
/// Create a new logfmt Layer to pass into tracing_subscriber
///
/// Note this layer simply formats and writes to the specified writer. It
@ -68,7 +74,7 @@ impl<W: MakeWriter> LogFmtLayer<W> {
impl<S, W> Layer<S> for LogFmtLayer<W>
where
W: MakeWriter + 'static,
W: for<'writer> MakeWriter<'writer> + 'static,
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn register_callsite(
@ -78,7 +84,7 @@ where
Interest::always()
}
fn new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let writer = self.writer.make_writer();
let metadata = ctx.metadata(id).expect("span should have metadata");
let mut p = FieldPrinter::new(writer, metadata.level(), self.display_target);

View File

@ -363,7 +363,7 @@ impl std::io::Write for CapturedWriter {
}
}
impl MakeWriter for CapturedWriter {
impl MakeWriter<'_> for CapturedWriter {
type Writer = Self;
fn make_writer(&self) -> Self::Writer {

View File

@ -2,7 +2,7 @@
name = "metric"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order

View File

@ -2,7 +2,7 @@
name = "metric_exporters"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order

View File

@ -1,7 +1,7 @@
[package]
name = "mutable_batch"
version = "0.1.0"
edition = "2018"
edition = "2021"
description = "A mutable arrow RecordBatch"
[dependencies]

View File

@ -1,10 +1,9 @@
//! A [`Column`] stores the rows for a given column name
use std::fmt::Formatter;
use std::iter::Enumerate;
use std::iter::{Enumerate, Zip};
use std::mem;
use std::sync::Arc;
use std::{convert::TryInto, iter::Zip};
use arrow::error::ArrowError;
use arrow::{

View File

@ -175,6 +175,20 @@ impl MutableBatch {
Ok(())
}
/// Extend this [`MutableBatch`] with `ranges` rows from `other`
pub fn extend_from_ranges(
&mut self,
other: &Self,
ranges: &[Range<usize>],
) -> writer::Result<()> {
let to_insert = ranges.iter().map(|x| x.end - x.start).sum();
let mut writer = writer::Writer::new(self, to_insert);
writer.write_batch_ranges(other, ranges)?;
writer.commit();
Ok(())
}
/// Returns a reference to the specified column
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
let idx = self

View File

@ -499,86 +499,105 @@ impl<'a> Writer<'a> {
src: &MutableBatch,
range: Range<usize>,
) -> Result<()> {
if range.start == 0 && range.end == src.row_count {
self.write_batch_ranges(src, &[range])
}
/// Write the rows identified by `ranges` to the provided MutableBatch
pub(crate) fn write_batch_ranges(
&mut self,
src: &MutableBatch,
ranges: &[Range<usize>],
) -> Result<()> {
let to_insert = self.to_insert;
if to_insert == src.row_count {
return self.write_batch(src);
}
assert_eq!(range.end - range.start, self.to_insert);
for (src_col_name, src_col_idx) in &src.column_names {
let src_col = &src.columns[*src_col_idx];
let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?;
let stats = match (&mut dst_col.data, &src_col.data) {
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::F64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::I64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::U64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => Statistics::F64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => Statistics::I64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => Statistics::U64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, _)) => {
dst_data.extend_from_range(src_data, range.clone());
Statistics::Bool(compute_bool_stats(
src_col.valid.bytes(),
range.clone(),
src_data,
))
dst_data.reserve(to_insert);
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_range(src_data, range.clone());
compute_bool_stats(
src_col.valid.bytes(),
range.clone(),
src_data,
&mut stats,
)
}
Statistics::Bool(stats)
}
(ColumnData::String(dst_data, _), ColumnData::String(src_data, _)) => {
dst_data.extend_from_range(src_data, range.clone());
Statistics::String(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
src_data.get(x).unwrap()
}))
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_range(src_data, range.clone());
compute_stats(src_col.valid.bytes(), range.clone(), &mut stats, |x| {
src_data.get(x).unwrap()
})
}
Statistics::String(stats)
}
(
ColumnData::Tag(dst_data, dst_dict, _),
ColumnData::Tag(src_data, src_dict, _),
) => {
dst_data.reserve(to_insert);
let mut mapping: Vec<_> = vec![None; src_dict.values().len()];
let mut stats = StatValues::new_empty();
dst_data.extend(src_data[range.clone()].iter().map(|src_id| match *src_id {
INVALID_DID => {
stats.update_for_nulls(1);
INVALID_DID
}
_ => {
let maybe_did = &mut mapping[*src_id as usize];
match maybe_did {
Some(did) => {
stats.total_count += 1;
*did
for range in ranges {
dst_data.extend(src_data[range.clone()].iter().map(
|src_id| match *src_id {
INVALID_DID => {
stats.update_for_nulls(1);
INVALID_DID
}
None => {
let value = src_dict.lookup_id(*src_id).unwrap();
stats.update(value);
_ => {
let maybe_did = &mut mapping[*src_id as usize];
match maybe_did {
Some(did) => {
stats.total_count += 1;
*did
}
None => {
let value = src_dict.lookup_id(*src_id).unwrap();
stats.update(value);
let did = dst_dict.lookup_value_or_insert(value);
*maybe_did = Some(did);
did
let did = dst_dict.lookup_value_or_insert(value);
*maybe_did = Some(did);
did
}
}
}
}
}
}));
},
));
}
Statistics::String(stats)
}
_ => unreachable!(),
};
dst_col
.valid
.extend_from_range(&src_col.valid, range.clone());
dst_col.valid.reserve(to_insert);
for range in ranges {
dst_col
.valid
.extend_from_range(&src_col.valid, range.clone());
}
self.statistics.push((dst_col_idx, stats));
}
@ -707,12 +726,16 @@ fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert:
}
}
fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> StatValues<bool> {
fn compute_bool_stats(
valid: &[u8],
range: Range<usize>,
col_data: &BitSet,
stats: &mut StatValues<bool>,
) {
// There are likely faster ways to do this
let indexes =
iter_set_positions_with_offset(valid, range.start).take_while(|idx| *idx < range.end);
let mut stats = StatValues::new_empty();
for index in indexes {
let value = col_data.get(index);
stats.update(&value)
@ -720,11 +743,33 @@ fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> S
let count = range.end - range.start;
stats.update_for_nulls(count as u64 - stats.total_count);
}
fn write_slice<T>(
to_insert: usize,
ranges: &[Range<usize>],
valid: &[u8],
src_data: &[T],
dst_data: &mut Vec<T>,
) -> StatValues<T>
where
T: Clone + PartialOrd + IsNan,
{
dst_data.reserve(to_insert);
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_slice(&src_data[range.clone()]);
compute_stats(valid, range.clone(), &mut stats, |x| &src_data[x]);
}
stats
}
fn compute_stats<'a, T, U, F>(valid: &[u8], range: Range<usize>, accessor: F) -> StatValues<T>
where
fn compute_stats<'a, T, U, F>(
valid: &[u8],
range: Range<usize>,
stats: &mut StatValues<T>,
accessor: F,
) where
U: 'a + ToOwned<Owned = T> + PartialOrd + ?Sized + IsNan,
F: Fn(usize) -> &'a U,
T: std::borrow::Borrow<U>,
@ -733,14 +778,12 @@ where
.take_while(|idx| *idx < range.end)
.map(accessor);
let mut stats = StatValues::new_empty();
for value in values {
stats.update(value)
}
let count = range.end - range.start;
stats.update_for_nulls(count as u64 - stats.total_count);
stats
}
impl<'a> Drop for Writer<'a> {

View File

@ -1,10 +1,11 @@
[package]
name = "mutable_batch_lp"
version = "0.1.0"
edition = "2018"
edition = "2021"
description = "Conversion logic for line protocol -> MutableBatch"
[dependencies]
hashbrown = "0.11"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
mutable_batch = { path = "../mutable_batch" }
schema = { path = "../schema" }

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use flate2::read::GzDecoder;
use mutable_batch_lp::lines_to_batch;
use mutable_batch_lp::lines_to_batches;
fn generate_lp_bytes() -> Bytes {
let raw = include_bytes!("../../tests/fixtures/lineproto/read_filter.lp.gz");
@ -23,7 +23,9 @@ pub fn write_lp(c: &mut Criterion) {
group.bench_function(BenchmarkId::from_parameter(count), |b| {
b.iter(|| {
for _ in 0..*count {
lines_to_batch(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
let batches =
lines_to_batches(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
assert_eq!(batches.len(), 1);
}
});
});

View File

@ -11,6 +11,7 @@
clippy::clone_on_ref_ptr
)]
use hashbrown::HashMap;
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch;
@ -36,18 +37,26 @@ pub enum Error {
/// Result type for line protocol conversion
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts the provided lines of line protocol to a [`MutableBatch`]
pub fn lines_to_batch(lines: &str, default_time: i64) -> Result<MutableBatch> {
let mut batch = MutableBatch::new();
/// Converts the provided lines of line protocol to a set of [`MutableBatch`]
/// keyed by measurement name
pub fn lines_to_batches(lines: &str, default_time: i64) -> Result<HashMap<String, MutableBatch>> {
let mut batches = HashMap::new();
for (line_idx, maybe_line) in parse_lines(lines).enumerate() {
let line = maybe_line.context(LineProtocol { line: line_idx + 1 })?;
let measurement = line.series.measurement.as_str();
let (_, batch) = batches
.raw_entry_mut()
.from_key(measurement)
.or_insert_with(|| (measurement.to_string(), MutableBatch::new()));
// TODO: Reuse writer
let mut writer = Writer::new(&mut batch, 1);
let mut writer = Writer::new(batch, 1);
write_line(&mut writer, line, default_time).context(Write { line: line_idx + 1 })?;
writer.commit();
}
Ok(batch)
Ok(batches)
}
fn write_line(
@ -95,10 +104,14 @@ mod tests {
fn test_basic() {
let lp = r#"cpu,tag1=v1,tag2=v2 val=2i 0
cpu,tag1=v4,tag2=v1 val=2i 0
mem,tag1=v2 ival=3i 0
cpu,tag2=v2 val=3i 1
cpu,tag1=v1,tag2=v2 fval=2.0"#;
cpu,tag1=v1,tag2=v2 fval=2.0
mem,tag1=v5 ival=2i 1
"#;
let batch = lines_to_batch(lp, 5).unwrap();
let batch = lines_to_batches(lp, 5).unwrap();
assert_eq!(batch.len(), 2);
assert_batches_eq!(
&[
@ -111,7 +124,19 @@ mod tests {
"| 2 | v1 | v2 | 1970-01-01T00:00:00.000000005Z | |",
"+------+------+------+--------------------------------+-----+",
],
&[batch.to_arrow(Selection::All).unwrap()]
&[batch["cpu"].to_arrow(Selection::All).unwrap()]
);
assert_batches_eq!(
&[
"+------+------+--------------------------------+",
"| ival | tag1 | time |",
"+------+------+--------------------------------+",
"| 3 | v2 | 1970-01-01T00:00:00Z |",
"| 2 | v5 | 1970-01-01T00:00:00.000000001Z |",
"+------+------+--------------------------------+",
],
&[batch["mem"].to_arrow(Selection::All).unwrap()]
);
}
}

View File

@ -1,7 +1,7 @@
[package]
name = "mutable_batch_pb"
version = "0.1.0"
edition = "2018"
edition = "2021"
description = "Conversion logic for binary write protocol <-> MutableBatch"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "mutable_buffer"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -2,7 +2,7 @@
name = "object_store"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
async-trait = "0.1.42"

View File

@ -131,6 +131,11 @@ pub enum Error {
#[snafu(display("Missing aws-secret-access-key"))]
MissingSecretAccessKey,
NotFound {
location: String,
source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
},
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
@ -208,9 +213,18 @@ impl ObjectStoreApi for AmazonS3 {
.client
.get_object(get_request)
.await
.context(UnableToGetData {
bucket: self.bucket_name.to_owned(),
location: key.clone(),
.map_err(|e| match e {
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) => {
Error::NotFound {
location: key.clone(),
source: e,
}
}
_ => Error::UnableToGetData {
bucket: self.bucket_name.to_owned(),
location: key.clone(),
source: e,
},
})?
.body
.context(NoData {
@ -729,20 +743,20 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(
source,
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_))
));
assert_eq!(bucket, &config.bucket);
let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
assert!(
matches!(
source_variant,
Some(rusoto_core::RusotoError::Service(
rusoto_s3::GetObjectError::NoSuchKey(_)
)),
),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);

View File

@ -30,7 +30,9 @@ pub enum Error {
},
#[snafu(display("Unable to walk dir: {}", source))]
UnableToWalkDir { source: walkdir::Error },
UnableToWalkDir {
source: walkdir::Error,
},
#[snafu(display("Unable to access metadata for {}: {}", path.display(), source))]
UnableToAccessMetadata {
@ -39,22 +41,44 @@ pub enum Error {
},
#[snafu(display("Unable to copy data to file: {}", source))]
UnableToCopyDataToFile { source: io::Error },
UnableToCopyDataToFile {
source: io::Error,
},
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir { source: io::Error, path: PathBuf },
UnableToCreateDir {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to create file {}: {}", path.display(), err))]
UnableToCreateFile { path: PathBuf, err: io::Error },
UnableToCreateFile {
path: PathBuf,
err: io::Error,
},
#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
UnableToDeleteFile { source: io::Error, path: PathBuf },
UnableToDeleteFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile { source: io::Error, path: PathBuf },
UnableToOpenFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes { source: io::Error, path: PathBuf },
UnableToReadBytes {
source: io::Error,
path: PathBuf,
},
NotFound {
location: String,
source: io::Error,
},
}
/// Local filesystem storage suitable for testing or for opting out of using a
@ -110,9 +134,19 @@ impl ObjectStoreApi for File {
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
let path = self.path(location);
let file = fs::File::open(&path)
.await
.context(UnableToOpenFile { path: &path })?;
let file = fs::File::open(&path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Error::NotFound {
location: location.to_string(),
source: e,
}
} else {
Error::UnableToOpenFile {
path: path.clone(),
source: e,
}
}
})?;
let s = FramedRead::new(file, BytesCodec::new())
.map_ok(|b| b.freeze())
@ -297,14 +331,12 @@ impl File {
#[cfg(test)]
mod tests {
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
use super::*;
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStore, ObjectStoreApi, ObjectStorePath,
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
use tempfile::TempDir;
#[tokio::test]
@ -395,4 +427,32 @@ mod tests {
// `list_with_delimiter
assert!(store.list_with_delimiter(&store.new_path()).await.is_err());
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn get_nonexistent_location() {
let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path());
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<std::io::Error>();
assert!(
matches!(source_variant, Some(std::io::Error { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -68,6 +68,11 @@ pub enum Error {
bucket: String,
location: String,
},
NotFound {
location: String,
source: cloud_storage::Error,
},
}
/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
@ -122,9 +127,18 @@ impl ObjectStoreApi for GoogleCloudStorage {
.object()
.download(&bucket_name, &location_copy)
.await
.context(UnableToGetData {
bucket: &self.bucket_name,
location,
.map_err(|e| match e {
cloud_storage::Error::Other(ref text) if text.starts_with("No such object") => {
Error::NotFound {
location,
source: e,
}
}
_ => Error::UnableToGetData {
bucket: bucket_name.clone(),
location,
source: e,
},
})?;
Ok(futures::stream::once(async move { Ok(bytes.into()) }).boxed())
@ -337,21 +351,15 @@ mod test {
.await
.unwrap_err();
if let Some(ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<cloud_storage::Error>();
assert!(
matches!(source, cloud_storage::Error::Other(_)),
matches!(source_variant, Some(cloud_storage::Error::Other(_))),
"got: {:?}",
source
source_variant
);
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err)

View File

@ -292,12 +292,9 @@ impl ObjectStoreApi for ObjectStore {
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.get(location).await?.err_into().boxed()
}
(File(file), path::Path::File(location)) => file
.get(location)
.await
.context(FileObjectStoreError)?
.err_into()
.boxed(),
(File(file), path::Path::File(location)) => {
file.get(location).await?.err_into().boxed()
}
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.get(location).await?.err_into().boxed()
}
@ -609,25 +606,49 @@ pub enum Error {
#[snafu(display("{}", source))]
DummyObjectStoreError { source: dummy::Error },
#[snafu(display("Object at location {} not found: {}", location, source))]
NotFound {
location: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl From<disk::Error> for Error {
fn from(source: disk::Error) -> Self {
Self::FileObjectStoreError { source }
match source {
disk::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::FileObjectStoreError { source },
}
}
}
#[cfg(feature = "gcp")]
impl From<gcp::Error> for Error {
fn from(source: gcp::Error) -> Self {
Self::GcsObjectStoreError { source }
match source {
gcp::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::GcsObjectStoreError { source },
}
}
}
#[cfg(feature = "aws")]
impl From<aws::Error> for Error {
fn from(source: aws::Error) -> Self {
Self::AwsObjectStoreError { source }
match source {
aws::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::AwsObjectStoreError { source },
}
}
}
@ -640,7 +661,13 @@ impl From<azure::Error> for Error {
impl From<memory::Error> for Error {
fn from(source: memory::Error) -> Self {
Self::InMemoryObjectStoreError { source }
match source {
memory::Error::NoDataInMemory { ref location } => Self::NotFound {
location: location.into(),
source: source.into(),
},
// currently "not found" is the only error that can happen with the in-memory store
}
}
}

View File

@ -159,8 +159,8 @@ mod tests {
use super::*;
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStore, ObjectStoreApi, ObjectStorePath,
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::TryStreamExt;
@ -194,4 +194,31 @@ mod tests {
.unwrap();
assert_eq!(&*read_data, expected_data);
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn nonexistent_location() {
let integration = ObjectStore::new_in_memory();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<Error>();
assert!(
matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -2,7 +2,7 @@
name = "observability_deps"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
description = "Observability ecosystem dependencies for InfluxDB IOx, to ensure consistent versions and unified updates"
[dependencies] # In alphabetical order

View File

@ -2,7 +2,7 @@
name = "packers"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -2,7 +2,7 @@
name = "panic_logging"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
observability_deps = { path = "../observability_deps" }

View File

@ -1,7 +1,7 @@
[package]
name = "parquet_catalog"
version = "0.1.0"
edition = "2018"
edition = "2021"
[dependencies]
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -2,7 +2,7 @@
name = "parquet_file"
version = "0.1.0"
authors = ["Nga Tran <nga-tran@live.com>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -1,7 +1,7 @@
[package]
name = "persistence_windows"
version = "0.1.0"
edition = "2018"
edition = "2021"
[dependencies]
data_types = { path = "../data_types" }

View File

@ -1,7 +1,7 @@
[package]
name = "predicate"
version = "0.1.0"
edition = "2018"
edition = "2021"
[dependencies]
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -32,8 +32,9 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
#[derive(Debug, Clone, Copy)]
/// The result of evaluating a predicate on a set of rows
pub enum PredicateMatch {
/// There is at least one row that matches the predicate
AtLeastOne,
/// There is at least one row that matches the predicate that has
/// at least one non null value in each field of the predicate
AtLeastOneNonNullField,
/// There are exactly zero rows that match the predicate
Zero,

View File

@ -2,7 +2,7 @@
name = "query"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "IOx Query Interface and Executor"
# This crate is designed to be independent of the rest of the IOx

View File

@ -22,20 +22,17 @@ use futures::TryStreamExt;
use observability_deps::tracing::{debug, trace};
use trace::{ctx::SpanContext, span::SpanRecorder};
use crate::{
exec::{
fieldlist::{FieldList, IntoFieldList},
non_null_checker::NonNullCheckerExec,
query_tracing::TracedStream,
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
seriesset::{
converter::{GroupGenerator, SeriesSetConverter},
series::Series,
},
split::StreamSplitExec,
stringset::{IntoStringSet, StringSetRef},
use crate::exec::{
fieldlist::{FieldList, IntoFieldList},
non_null_checker::NonNullCheckerExec,
query_tracing::TracedStream,
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
seriesset::{
converter::{GroupGenerator, SeriesSetConverter},
series::Series,
},
plan::stringset::TableNamePlanBuilder,
split::StreamSplitExec,
stringset::{IntoStringSet, StringSetRef},
};
use crate::plan::{
@ -489,25 +486,6 @@ impl IOxExecutionContext {
}
}
/// Executes table_plans and, if returns some rows, add that table into the return list
/// Tables discovered from meta data won't need any plan
pub async fn to_table_names(&self, builder: TableNamePlanBuilder) -> Result<StringSetRef> {
let ctx = self.child_ctx("to_table_names");
// first get all meta data tables
let mut tables = builder.meta_data_table_names().clone();
// Now run each plan and if it returns data, add it table in
let table_plans = builder.table_plans();
for (table, plan) in table_plans {
if !ctx.run_logical_plan(plan).await?.is_empty() {
tables.insert(table.clone());
}
}
Ok(Arc::new(tables))
}
/// Run the plan and return a record batch reader for reading the results
pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result<Vec<RecordBatch>> {
self.run_logical_plans(vec![plan]).await

View File

@ -26,7 +26,7 @@ use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::{
exec::{field::FieldColumns, make_schema_pivot},
exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot},
func::{
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput},
window::make_window_bound_expr,
@ -35,9 +35,7 @@ use crate::{
plan::{
fieldlist::FieldListPlan,
seriesset::{SeriesSetPlan, SeriesSetPlans},
stringset::{
Error as StringSetError, StringSetPlan, StringSetPlanBuilder, TableNamePlanBuilder,
},
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
},
provider::ProviderBuilder,
QueryChunk, QueryChunkMeta, QueryDatabase,
@ -229,11 +227,13 @@ impl InfluxRpcPlanner {
/// . A set of plans of tables of either
/// . chunks with deleted data or
/// . chunks without deleted data but cannot be decided from meta data
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<TableNamePlanBuilder>
pub fn table_names<D>(&self, database: &D, predicate: Predicate) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
{
let mut builder = TableNamePlanBuilder::new();
debug!(predicate=?predicate, "planning table_names");
let mut builder = StringSetPlanBuilder::new();
let mut normalizer = PredicateNormalizer::new(predicate);
// Mapping between table and chunks that need full plan
@ -243,9 +243,11 @@ impl InfluxRpcPlanner {
// and which chunks needs full plan and group them into their table
for chunk in database.chunks(normalizer.unnormalized()) {
let table_name = chunk.table_name();
trace!(chunk_id=%chunk.id(), table_name, "Considering table");
// Table is already in the returned table list, no longer needs to discover it from other chunks
if builder.contains_meta_data_table(table_name.to_string()) {
if builder.contains(table_name) {
trace!("already seen");
continue;
}
@ -257,10 +259,11 @@ impl InfluxRpcPlanner {
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
} else {
// See if we can have enough info from the chunk's meta data to answer
// that this table participates in the request
// See if we have enough info only from the chunk's
// meta data to know if the table has data that
// matches the predicate
let predicate = normalizer.normalized(table_name);
//
// Try and apply the predicate using only metadata
let pred_result = chunk
.apply_predicate_to_metadata(&predicate)
@ -268,38 +271,38 @@ impl InfluxRpcPlanner {
.context(CheckingChunkPredicate {
chunk_id: chunk.id(),
})?;
//
match pred_result {
PredicateMatch::AtLeastOne => {
PredicateMatch::AtLeastOneNonNullField => {
trace!("Metadata predicate: table matches");
// Meta data of the table covers predicates of the request
builder.append_meta_data_table(table_name.to_string());
builder.append_string(table_name);
}
PredicateMatch::Unknown => {
trace!("Metadata predicate: unknown match");
// We cannot match the predicate to get answer from meta data, let do full plan
full_plan_table_chunks
.entry(table_name.to_string())
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
PredicateMatch::Zero => {} // this chunk's table does not participate in the request
PredicateMatch::Zero => {
trace!("Metadata predicate: zero rows match");
} // this chunk's table does not participate in the request
}
}
}
// remove items from full_plan_table_chunks whose tables are already in the returned list
let meta_data_tables = builder.meta_data_table_names();
for table in meta_data_tables {
full_plan_table_chunks.remove(&table);
// remove items from full_plan_table_chunks whose tables are
// already in the returned list
for table in builder.known_strings_iter() {
trace!(%table, "Table is known to have matches, skipping plan");
full_plan_table_chunks.remove(table);
if full_plan_table_chunks.is_empty() {
break;
}
}
// No full plans needed
if full_plan_table_chunks.is_empty() {
return Ok(builder);
}
// Now build plans for full-plan tables
for (table_name, chunks) in full_plan_table_chunks {
let schema = database.table_schema(&table_name).context(TableRemoved {
@ -308,11 +311,11 @@ impl InfluxRpcPlanner {
if let Some(plan) =
self.table_name_plan(&table_name, schema, &mut normalizer, chunks)?
{
builder.append_plans(table_name, plan);
builder = builder.append_other(plan.into());
}
}
Ok(builder)
builder.build().context(CreatingStringSet)
}
/// Returns a set of plans that produces the names of "tag"
@ -426,14 +429,14 @@ impl InfluxRpcPlanner {
let plan = self.tag_keys_plan(&table_name, schema, &mut normalizer, chunks)?;
if let Some(plan) = plan {
builder = builder.append(plan)
builder = builder.append_other(plan)
}
}
}
// add the known columns we could find from metadata only
builder
.append(known_columns.into())
.append_other(known_columns.into())
.build()
.context(CreatingStringSet)
}
@ -595,13 +598,13 @@ impl InfluxRpcPlanner {
.build()
.context(BuildingPlan)?;
builder = builder.append(plan.into());
builder = builder.append_other(plan.into());
}
}
// add the known values we could find from metadata only
builder
.append(known_values.into())
.append_other(known_values.into())
.build()
.context(CreatingStringSet)
}
@ -829,19 +832,13 @@ impl InfluxRpcPlanner {
chunk_id: chunk.id(),
})?;
match pred_result {
PredicateMatch::AtLeastOne |
if !matches!(pred_result, PredicateMatch::Zero) {
// have to include chunk as we can't rule it out
PredicateMatch::Unknown => {
let table_name = chunk.table_name().to_string();
table_chunks
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
// Skip chunk here based on metadata
PredicateMatch::Zero => {
}
let table_name = chunk.table_name().to_string();
table_chunks
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
}
Ok(table_chunks)
@ -959,11 +956,11 @@ impl InfluxRpcPlanner {
Ok(Some(plan))
}
/// Creates a DataFusion LogicalPlan that returns the timestamp
/// for a specified table:
/// Creates a DataFusion LogicalPlan that returns the values in
/// the fields for a specified table:
///
/// The output looks like (time)
/// The time column is chosen because it must be included in all tables
/// The output produces the table name as a single string if any
/// non null values are passed in.
///
/// The data is not sorted in any particular order
///
@ -973,13 +970,11 @@ impl InfluxRpcPlanner {
/// The created plan looks like:
///
/// ```text
/// Projection (select time)
/// NonNullChecker
/// Projection (select fields)
/// Filter(predicate) [optional]
/// Scan
/// ```
// TODO: for optimization in the future, build `select count(*)` plan instead,
// ,but if we do this, we also need to change the way we handle output
// of the function invoking this because it will always return a number
fn table_name_plan<C>(
&self,
table_name: &str,
@ -990,6 +985,7 @@ impl InfluxRpcPlanner {
where
C: QueryChunk + 'static,
{
debug!(%table_name, "Creating table_name full plan");
let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -999,15 +995,11 @@ impl InfluxRpcPlanner {
Some(t) => t,
};
// Selection of only time
let select_exprs = schema
.iter()
.filter_map(|(influx_column_type, field)| match influx_column_type {
Some(InfluxColumnType::Timestamp) => Some(col(field.name())),
Some(_) => None,
None => None,
})
.collect::<Vec<_>>();
// Select only fields requested
let predicate = normalizer.normalized(table_name, Arc::clone(&schema));
let select_exprs: Vec<_> = filtered_fields_iter(&schema, &predicate)
.map(|field| col(field.name().as_str()))
.collect();
let plan = plan_builder
.project(select_exprs)
@ -1015,6 +1007,9 @@ impl InfluxRpcPlanner {
.build()
.context(BuildingPlan)?;
// Add the final node that outputs the table name or not, depending
let plan = make_non_null_checker(table_name, plan);
Ok(Some(plan))
}

View File

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc};
use std::sync::Arc;
use arrow_util::util::str_iter_to_batch;
use datafusion::logical_plan::LogicalPlan;
@ -97,7 +97,7 @@ impl StringSetPlanBuilder {
/// Append the strings from the passed plan into ourselves if possible, or
/// passes on the plan
pub fn append(mut self, other: StringSetPlan) -> Self {
pub fn append_other(mut self, other: StringSetPlan) -> Self {
match other {
StringSetPlan::Known(ssref) => match Arc::try_unwrap(ssref) {
Ok(mut ss) => {
@ -117,6 +117,23 @@ impl StringSetPlanBuilder {
self
}
/// Return true if we know already that `s` is contained in the
/// StringSet. Note that if `contains()` returns false, `s` may be
/// in the stringset after execution.
pub fn contains(&self, s: impl AsRef<str>) -> bool {
self.strings.contains(s.as_ref())
}
/// Append a single string to the known set of strings in this builder
pub fn append_string(&mut self, s: impl Into<String>) {
self.strings.insert(s.into());
}
/// returns an iterator over the currently known strings in this builder
pub fn known_strings_iter(&self) -> impl Iterator<Item = &String> {
self.strings.iter()
}
/// Create a StringSetPlan that produces the deduplicated (union)
/// of all plans `append`ed to this builder.
pub fn build(self) -> Result<StringSetPlan> {
@ -143,39 +160,6 @@ impl StringSetPlanBuilder {
}
}
#[derive(Debug, Default)]
pub struct TableNamePlanBuilder {
/// Known tables achieved from meta data
meta_data_tables: StringSet,
/// Other tables and their general plans
plans: BTreeMap<String, LogicalPlan>,
}
impl TableNamePlanBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn append_meta_data_table(&mut self, table: String) {
self.meta_data_tables.insert(table);
}
pub fn append_plans(&mut self, table_name: String, plan: LogicalPlan) {
self.plans.insert(table_name, plan);
}
pub fn contains_meta_data_table(&self, table: String) -> bool {
self.meta_data_tables.contains(&table)
}
pub fn meta_data_table_names(&self) -> StringSet {
self.meta_data_tables.clone()
}
pub fn table_plans(&self) -> BTreeMap<String, LogicalPlan> {
self.plans.clone()
}
}
#[cfg(test)]
mod tests {
use crate::exec::{Executor, ExecutorType};
@ -196,8 +180,8 @@ mod tests {
#[test]
fn test_builder_strings_only() {
let plan = StringSetPlanBuilder::new()
.append(to_string_set(&["foo", "bar"]).into())
.append(to_string_set(&["bar", "baz"]).into())
.append_other(to_string_set(&["foo", "bar"]).into())
.append_other(to_string_set(&["bar", "baz"]).into())
.build()
.unwrap();
@ -228,9 +212,9 @@ mod tests {
// when a df plan is appended the whole plan should be different
let plan = StringSetPlanBuilder::new()
.append(to_string_set(&["foo", "bar"]).into())
.append(vec![df_plan].into())
.append(to_string_set(&["baz"]).into())
.append_other(to_string_set(&["foo", "bar"]).into())
.append_other(vec![df_plan].into())
.append_other(to_string_set(&["baz"]).into())
.build()
.unwrap();

View File

@ -2,7 +2,7 @@
name = "query_tests"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Tests of the query engine against different database configurations"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -3,9 +3,9 @@ name = "generate"
description = "Creates rust #tests for files in .sql"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
# Note this is a standalone binary and not part of the overall workspace
[workspace]
[workspace]

View File

@ -1,4 +1,5 @@
//! Tests for the Influx gRPC queries
use datafusion::logical_plan::{col, lit};
use predicate::predicate::{Predicate, PredicateBuilder, EMPTY_PREDICATE};
use query::{
exec::stringset::{IntoStringSet, StringSetRef},
@ -24,11 +25,12 @@ where
let planner = InfluxRpcPlanner::new();
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let builder = planner
let plan = planner
.table_names(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let names = ctx
.to_table_names(builder)
.to_string_set(plan)
.await
.expect("converted plan to strings successfully");
@ -53,6 +55,49 @@ async fn list_table_names_no_data_pred() {
run_table_names_test_case(TwoMeasurements {}, EMPTY_PREDICATE, vec!["cpu", "disk"]).await;
}
#[tokio::test]
async fn list_table_names_no_data_passes() {
// no rows pass this predicate
run_table_names_test_case(
TwoMeasurementsManyFields {},
tsp(10000000, 20000000),
vec![],
)
.await;
}
#[tokio::test]
async fn list_table_names_no_non_null_data_passes() {
// only a single row with a null field passes this predicate (expect no table names)
let predicate = PredicateBuilder::default()
.table("o2")
// only get last row of o2 (timestamp = 300)
.timestamp_range(200, 400)
// model predicate like _field='reading' which last row does not have
.field_columns(vec!["reading"])
.build();
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
}
#[tokio::test]
async fn list_table_names_no_non_null_general_data_passes() {
// only a single row with a null field passes this predicate
// (expect no table names) -- has a general purpose predicate to
// force a generic plan
let predicate = PredicateBuilder::default()
.table("o2")
// only get last row of o2 (timestamp = 300)
.timestamp_range(200, 400)
// model predicate like _field='reading' which last row does not have
.field_columns(vec!["reading"])
// (state = CA) OR (temp > 50)
.add_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50))))
.build();
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
}
#[tokio::test]
async fn list_table_names_no_data_pred_with_delete() {
run_table_names_test_case(

View File

@ -103,11 +103,11 @@ async fn chunk_pruning_influxrpc() {
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let builder = InfluxRpcPlanner::new()
let plan = InfluxRpcPlanner::new()
.table_names(db.as_ref(), predicate)
.unwrap();
let result = ctx.to_table_names(builder).await.unwrap();
let result = ctx.to_string_set(plan).await.unwrap();
assert_eq!(&expected, result.as_ref());

View File

@ -2,7 +2,7 @@
name = "read_buffer"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
edition = "2021"
# Note this crate is designed to be standalone, and should not depend
# on the IOx Query Engine. The rationale is:

View File

@ -1,4 +1,4 @@
edition = "2018"
edition = "2021"
# Unstable features not yet supported on stable Rust
#wrap_comments = true

View File

@ -2,7 +2,7 @@
name = "schema"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "IOx Schema definition"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "server"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }

View File

@ -4,6 +4,7 @@ use object_store::ObjectStore;
use observability_deps::tracing::info;
use query::exec::Executor;
use time::TimeProvider;
use trace::TraceCollector;
use write_buffer::config::WriteBufferConfigFactory;
use crate::JobRegistry;
@ -18,13 +19,18 @@ pub struct ApplicationState {
job_registry: Arc<JobRegistry>,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl ApplicationState {
/// Creates a new `ApplicationState`
///
/// Uses number of CPUs in the system if num_worker_threads is not set
pub fn new(object_store: Arc<ObjectStore>, num_worker_threads: Option<usize>) -> Self {
pub fn new(
object_store: Arc<ObjectStore>,
num_worker_threads: Option<usize>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
@ -45,6 +51,7 @@ impl ApplicationState {
job_registry,
metric_registry,
time_provider,
trace_collector,
}
}
@ -68,6 +75,10 @@ impl ApplicationState {
&self.time_provider
}
pub fn trace_collector(&self) -> &Option<Arc<dyn TraceCollector>> {
&self.trace_collector
}
pub fn executor(&self) -> &Arc<Executor> {
&self.executor
}

View File

@ -30,7 +30,6 @@ use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
use trace::ctx::SpanContext;
use trace::{RingBufferTraceCollector, TraceCollector};
use uuid::Uuid;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
@ -1312,10 +1311,8 @@ impl DatabaseStateCatalogLoaded {
) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
// TODO: use proper trace collector
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let rules = self.provided_rules.rules();
let trace_collector = shared.application.trace_collector();
let write_buffer_factory = shared.application.write_buffer_factory();
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => {
@ -1323,7 +1320,7 @@ impl DatabaseStateCatalogLoaded {
.new_config_read(
shared.config.server_id,
shared.config.name.as_str(),
Some(&trace_collector),
trace_collector.as_ref(),
connection,
)
.await
@ -1375,13 +1372,14 @@ impl DatabaseStateInitialized {
#[cfg(test)]
mod tests {
use crate::test_utils::make_application;
use super::*;
use data_types::database_rules::{
PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection,
};
use data_types::sequence::Sequence;
use entry::{test_helpers::lp_to_entries, SequencedEntry};
use object_store::ObjectStore;
use std::{
convert::{TryFrom, TryInto},
num::NonZeroU32,
@ -1393,10 +1391,7 @@ mod tests {
#[tokio::test]
async fn database_shutdown_waits_for_jobs() {
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
));
let application = make_application();
let database = Database::new(
Arc::clone(&application),
@ -1454,10 +1449,7 @@ mod tests {
async fn initialized_database() -> (Arc<ApplicationState>, Database) {
let server_id = ServerId::try_from(1).unwrap();
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
));
let application = make_application();
let db_name = DatabaseName::new("test").unwrap();
let uuid = Uuid::new_v4();
@ -1594,10 +1586,7 @@ mod tests {
));
// setup application
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
));
let application = make_application();
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), state.clone());

View File

@ -20,6 +20,7 @@ use predicate::{
};
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk;
use schema::InfluxColumnType;
use schema::{selection::Selection, sort::SortKey, Schema};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
@ -237,6 +238,31 @@ impl DbChunk {
debug!(?rub_preds, "RUB delete predicates");
Ok(rub_preds)
}
/// Return true if any of the fields called for in the `predicate`
/// contain at least 1 null value. Returns false ONLY if all
/// fields that pass `predicate` are entirely non null
fn fields_have_nulls(&self, predicate: &Predicate) -> bool {
self.meta.schema.iter().any(|(influx_column_type, field)| {
if matches!(influx_column_type, Some(InfluxColumnType::Field(_)))
&& predicate.should_include_field(field.name())
{
match self.meta.table_summary.column(field.name()) {
Some(column_summary) => {
// only if this is false can we return false
column_summary.null_count() > 0
}
None => {
// don't know the stats for this column, so assume there can be nulls
true
}
}
} else {
// not a field column
false
}
})
}
}
impl QueryChunk for DbChunk {
@ -264,23 +290,12 @@ impl QueryChunk for DbChunk {
return Ok(PredicateMatch::Zero);
}
// TODO apply predicate pruning here...
let pred_result = match &self.state {
State::MutableBuffer { chunk, .. } => {
if predicate.has_exprs() {
// TODO: Support more predicates
if predicate.has_exprs() || chunk.has_timerange(&predicate.range) {
// TODO some more work to figure out if we
// definite have / do not have result
PredicateMatch::Unknown
} else if chunk.has_timerange(&predicate.range) {
// Note: this isn't precise / correct: if the
// chunk has the timerange, some other part of the
// predicate may rule out the rows, and thus
// without further work this clause should return
// "Unknown" rather than falsely claiming that
// there is at least one row:
//
// https://github.com/influxdata/influxdb_iox/issues/1590
PredicateMatch::AtLeastOne
} else {
PredicateMatch::Zero
}
@ -305,19 +320,21 @@ impl QueryChunk for DbChunk {
// on meta-data only. This should be possible without needing to
// know the execution engine the chunk is held in.
if chunk.satisfies_predicate(&rb_predicate) {
PredicateMatch::AtLeastOne
// if any of the fields referred to in the
// predicate has nulls, don't know without more
// work if the rows that matched had non null values
if self.fields_have_nulls(predicate) {
PredicateMatch::Unknown
} else {
PredicateMatch::AtLeastOneNonNullField
}
} else {
PredicateMatch::Zero
}
}
State::ParquetFile { chunk, .. } => {
if predicate.has_exprs() {
// TODO: Support more predicates
if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) {
PredicateMatch::Unknown
} else if chunk.has_timerange(predicate.range.as_ref()) {
// As above, this should really be "Unknown" rather than AtLeastOne
// for precision / correctness.
PredicateMatch::AtLeastOne
} else {
PredicateMatch::Zero
}

View File

@ -408,14 +408,11 @@ impl SequenceNumberSection {
#[cfg(test)]
mod tests {
use super::*;
use std::{
convert::TryFrom,
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
sync::Arc,
time::{Duration, Instant},
use crate::{
lifecycle::LifecycleWorker,
utils::{TestDb, TestDbBuilder},
write_buffer::WriteBufferConsumer,
};
use arrow_util::assert_batches_eq;
use data_types::{
database_rules::{PartitionTemplate, Partitioner, TemplatePart},
@ -432,16 +429,18 @@ mod tests {
min_max_sequence::OptionalMinMaxSequence,
};
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
use std::{
convert::TryFrom,
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
sync::Arc,
time::{Duration, Instant},
};
use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture};
use time::{Time, TimeProvider};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::lifecycle::LifecycleWorker;
use crate::utils::TestDb;
use crate::write_buffer::WriteBufferConsumer;
#[derive(Debug)]
struct TestSequencedEntry {
sequencer_id: u32,
@ -572,15 +571,17 @@ mod tests {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db(
let test_db_builder = Self::create_test_db_builder(
Arc::clone(&object_store),
server_id,
db_name,
partition_template.clone(),
self.catalog_transactions_until_checkpoint,
Arc::<time::MockProvider>::clone(&time),
)
.await;
);
let (mut test_db, mut shutdown, mut join_handle) =
Self::create_test_db(&test_db_builder).await;
let mut lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db));
@ -620,15 +621,8 @@ mod tests {
drop(test_db);
// then create new one
let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db(
Arc::clone(&object_store),
server_id,
db_name,
partition_template.clone(),
self.catalog_transactions_until_checkpoint,
Arc::<time::MockProvider>::clone(&time),
)
.await;
let (test_db_tmp, shutdown_tmp, join_handle_tmp) =
Self::create_test_db(&test_db_builder).await;
test_db = test_db_tmp;
shutdown = shutdown_tmp;
join_handle = join_handle_tmp;
@ -759,14 +753,29 @@ mod tests {
}
async fn create_test_db(
builder: &TestDbBuilder,
) -> (TestDb, CancellationToken, JoinHandle<()>) {
let test_db = builder.build().await;
// start background worker
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&test_db.db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
(test_db, shutdown, join_handle)
}
fn create_test_db_builder(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: &'static str,
partition_template: PartitionTemplate,
catalog_transactions_until_checkpoint: NonZeroU64,
time_provider: Arc<dyn TimeProvider>,
) -> (TestDb, CancellationToken, JoinHandle<()>) {
let test_db = TestDb::builder()
) -> TestDbBuilder {
TestDb::builder()
.object_store(object_store)
.server_id(server_id)
.lifecycle_rules(data_types::database_rules::LifecycleRules {
@ -779,17 +788,6 @@ mod tests {
.partition_template(partition_template)
.time_provider(time_provider)
.db_name(db_name)
.build()
.await;
// start background worker
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&test_db.db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
(test_db, shutdown, join_handle)
}
/// Evaluates given checks.

View File

@ -793,23 +793,7 @@ where
Ok(())
}
/// List deleted databases in object storage.
pub async fn list_deleted_databases(&self) -> Result<Vec<DetailedDatabase>> {
let server_id = {
let state = self.shared.state.read();
let initialized = state.initialized()?;
initialized.server_id
};
Ok(IoxObjectStore::list_deleted_databases(
self.shared.application.object_store(),
server_id,
)
.await
.context(ListDeletedDatabases)?)
}
/// List all databases, active and deleted, in object storage, including their generation IDs.
/// List this server's databases in object storage, including their generation IDs.
pub async fn list_detailed_databases(&self) -> Result<Vec<DetailedDatabase>> {
let server_id = {
let state = self.shared.state.read();
@ -892,6 +876,9 @@ where
// immediately to the client and abort all other outstanding requests.
futures_util::future::try_join_all(sharded_entries.into_iter().map(
|sharded_entry| async {
// capture entire entry in closure
let sharded_entry = sharded_entry;
let sink = match &rules.routing_rules {
Some(RoutingRules::ShardConfig(shard_config)) => {
let id = sharded_entry.shard_id.expect("sharded entry");
@ -1205,7 +1192,13 @@ async fn maybe_initialize_server(shared: &ServerShared) {
init_ready.server_id,
)
.await
.map_err(|e| InitError::GetServerConfig { source: e })
.or_else(|e| match e {
// If this is the first time starting up this server and there is no config file yet,
// this isn't a problem. Start an empty server config.
object_store::Error::NotFound { .. } => Ok(bytes::Bytes::new()),
// Any other error is a problem.
_ => Err(InitError::GetServerConfig { source: e }),
})
.and_then(|config| {
generated_types::server_config::decode_persisted_server_config(config)
.map_err(|e| InitError::DeserializeServerConfig { source: e })
@ -1220,21 +1213,9 @@ async fn maybe_initialize_server(shared: &ServerShared) {
location,
)
})
.collect()
.collect::<Vec<_>>()
});
// TODO: This is a temporary fallback until the transition to server config files being the
// source of truth for database name and location is finished.
let maybe_databases = match maybe_databases {
Ok(maybe) => Ok(maybe),
Err(_) => IoxObjectStore::list_possible_databases(
shared.application.object_store(),
init_ready.server_id,
)
.await
.map_err(|e| InitError::ListDatabases { source: e }),
};
let next_state = match maybe_databases {
Ok(databases) => {
let mut state = ServerStateInitialized {
@ -1368,6 +1349,7 @@ pub mod test_utils {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
None,
))
}
@ -1646,7 +1628,7 @@ mod tests {
}
#[tokio::test]
async fn load_databases_and_transition_to_server_config() {
async fn load_databases() {
let application = make_application();
let server = make_server(Arc::clone(&application));
@ -1678,13 +1660,6 @@ mod tests {
.await
.expect("cannot delete rules file");
// delete server config file - this is not something that's supposed to happen but is
// what will happen during the transition to using the server config file
let mut path = application.object_store().new_path();
path.push_dir(server_id.to_string());
path.set_file_name("config.pb");
application.object_store().delete(&path).await.unwrap();
let server = make_server(Arc::clone(&application));
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
@ -2107,15 +2082,19 @@ mod tests {
async fn init_error_generic() {
// use an object store that will hopefully fail to read
let store = Arc::new(ObjectStore::new_failing_store().unwrap());
let application = Arc::new(ApplicationState::new(store, None));
let application = Arc::new(ApplicationState::new(store, None, None));
let server = make_server(application);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
let err = server.wait_for_init().await.unwrap_err();
assert!(matches!(err.as_ref(), InitError::ListDatabases { .. }));
assert!(
matches!(err.as_ref(), InitError::GetServerConfig { .. }),
"got: {:?}",
err
);
assert_contains!(
server.server_init_error().unwrap().to_string(),
"error listing databases in object storage:"
"error getting server config from object storage:"
);
}
@ -2233,24 +2212,27 @@ mod tests {
let foo_db_name = DatabaseName::new("foo").unwrap();
// create a directory in object storage that looks like it could
// be a database directory, but doesn't have any valid generation
// directories in it
let mut fake_db_path = application.object_store().new_path();
fake_db_path.push_all_dirs(&[server_id.to_string().as_str(), foo_db_name.as_str()]);
let mut not_generation_file = fake_db_path.clone();
not_generation_file.set_file_name("not-a-generation");
application
.object_store()
.put(&not_generation_file, Bytes::new())
.await
.unwrap();
// start server
let server = make_server(Arc::clone(&application));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// create database
create_simple_database(&server, &foo_db_name)
.await
.expect("failed to create database");
// delete database
server
.delete_database(&foo_db_name)
.await
.expect("failed to delete database");
// restart server
let server = make_server(Arc::clone(&application));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// generic error MUST NOT be set
assert!(server.server_init_error().is_none());

View File

@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken;
use entry::SequencedEntry;
use observability_deps::tracing::{debug, error, info, warn};
use trace::span::SpanRecorder;
use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading};
use crate::Db;
@ -151,12 +152,20 @@ async fn stream_in_sequenced_entries<'a>(
// store entry
let mut logged_hard_limit = false;
loop {
let mut span_recorder = SpanRecorder::new(
sequenced_entry
.span_context()
.map(|parent| parent.child("IOx write buffer")),
);
match db.store_sequenced_entry(
Arc::clone(&sequenced_entry),
crate::db::filter_table_batch_keep_all,
) {
Ok(_) => {
metrics.success();
span_recorder.ok("stored entry");
break;
}
Err(crate::db::Error::HardLimitReached {}) => {
@ -169,6 +178,8 @@ async fn stream_in_sequenced_entries<'a>(
);
logged_hard_limit = true;
}
span_recorder.error("hard limit reached");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
@ -179,6 +190,7 @@ async fn stream_in_sequenced_entries<'a>(
sequencer_id,
"Error storing SequencedEntry from write buffer in database"
);
span_recorder.error("cannot store entry");
// no retry
break;

View File

@ -2,7 +2,7 @@
name = "server_benchmarks"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Server related bechmarks, grouped into their own crate to minimize build dev build times"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -139,12 +139,8 @@ struct Create {
/// Get list of databases
#[derive(Debug, StructOpt)]
struct List {
/// Whether to list databases marked as deleted instead, to restore or permanently delete.
#[structopt(long)]
deleted: bool,
/// Whether to list detailed information, including generation IDs, about all databases,
/// whether they are active or marked as deleted.
/// Whether to list detailed information about the databases, such as generation IDs along
/// with their names.
#[structopt(long)]
detailed: bool,
}
@ -264,12 +260,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
}
Command::List(list) => {
let mut client = management::Client::new(connection);
if list.deleted || list.detailed {
let databases = if list.deleted {
client.list_deleted_databases().await?
} else {
client.list_detailed_databases().await?
};
if list.detailed {
let databases = client.list_detailed_databases().await?;
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");

View File

@ -79,7 +79,10 @@ async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
async fn make_application(
config: &Config,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<ApplicationState>> {
warn_about_inmem_store(&config.object_store_config);
let object_store =
ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?;
@ -91,6 +94,7 @@ async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
Ok(Arc::new(ApplicationState::new(
object_storage,
config.num_worker_threads,
trace_collector,
)))
}
@ -178,7 +182,11 @@ pub async fn main(config: Config) -> Result<()> {
let f = SendPanicsToTracing::new();
std::mem::forget(f);
let application = make_application(&config).await?;
let async_exporter = config.tracing_config.build().context(Tracing)?;
let trace_collector = async_exporter
.clone()
.map(|x| -> Arc<dyn TraceCollector> { x });
let application = make_application(&config, trace_collector).await?;
// Register jemalloc metrics
application
@ -189,17 +197,12 @@ pub async fn main(config: Config) -> Result<()> {
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
let http_listener = http_listener(config.http_bind_address).await?;
let async_exporter = config.tracing_config.build().context(Tracing)?;
let trace_collector = async_exporter
.clone()
.map(|x| -> Arc<dyn TraceCollector> { x });
let r = serve(
config,
application,
grpc_listener,
http_listener,
trace_collector,
app_server,
)
.await;
@ -241,7 +244,6 @@ async fn serve(
application: Arc<ApplicationState>,
grpc_listener: tokio::net::TcpListener,
http_listener: AddrIncoming,
trace_collector: Option<Arc<dyn TraceCollector>>,
app_server: Arc<AppServer<ConnectionManager>>,
) -> Result<()> {
// Construct a token to trigger shutdown of API services
@ -262,7 +264,6 @@ async fn serve(
Arc::clone(&application),
Arc::clone(&app_server),
trace_header_parser.clone(),
trace_collector.clone(),
frontend_shutdown.clone(),
config.initial_serving_state.into(),
)
@ -279,7 +280,6 @@ async fn serve(
frontend_shutdown.clone(),
max_http_request_size,
trace_header_parser,
trace_collector,
)
.fuse();
info!("HTTP server listening");
@ -381,7 +381,7 @@ mod tests {
use super::*;
use ::http::{header::HeaderName, HeaderValue};
use data_types::{database_rules::DatabaseRules, DatabaseName};
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use server::rules::ProvidedDatabaseRules;
use std::{convert::TryInto, num::NonZeroU64};
use structopt::StructOpt;
@ -412,16 +412,9 @@ mod tests {
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
serve(
config,
application,
grpc_listener,
http_listener,
None,
server,
)
.await
.unwrap()
serve(config, application, grpc_listener, http_listener, server)
.await
.unwrap()
}
#[tokio::test]
@ -430,7 +423,7 @@ mod tests {
// Create a server and wait for it to initialize
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -458,7 +451,7 @@ mod tests {
async fn test_server_shutdown_uninit() {
// Create a server but don't set a server id
let config = test_config(None);
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
@ -489,7 +482,7 @@ mod tests {
async fn test_server_panic() {
// Create a server and wait for it to initialize
let config = test_config(Some(999999999));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -516,7 +509,7 @@ mod tests {
async fn test_database_panic() {
// Create a server and wait for it to initialize
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -597,7 +590,9 @@ mod tests {
JoinHandle<Result<()>>,
) {
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, Some(Arc::<T>::clone(collector)))
.await
.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -611,7 +606,6 @@ mod tests {
application,
grpc_listener,
http_listener,
Some(Arc::<T>::clone(collector)),
Arc::clone(&server),
);
@ -690,6 +684,11 @@ mod tests {
join.await.unwrap().unwrap();
}
/// Ensure that query is fully executed.
async fn consume_query(mut query: PerformQuery) {
while query.next().await.unwrap().is_some() {}
}
#[tokio::test]
async fn test_query_tracing() {
let collector = Arc::new(RingBufferTraceCollector::new(100));
@ -721,10 +720,13 @@ mod tests {
.unwrap();
let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
flight
.perform_query(db_info.db_name(), "select * from cpu;")
.await
.unwrap();
consume_query(
flight
.perform_query(db_info.db_name(), "select * from cpu;")
.await
.unwrap(),
)
.await;
flight
.perform_query("nonexistent", "select * from cpu;")

View File

@ -52,7 +52,6 @@ use std::{
};
use tokio_util::sync::CancellationToken;
use tower::Layer;
use trace::TraceCollector;
use trace_http::tower::TraceLayer;
/// Constants used in API error codes.
@ -865,12 +864,12 @@ pub async fn serve<M>(
shutdown: CancellationToken,
max_request_size: usize,
trace_header_parser: TraceHeaderParser,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<(), hyper::Error>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
let metric_registry = Arc::clone(application.metric_registry());
let trace_collector = application.trace_collector().clone();
let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false);
let lp_metrics = Arc::new(LineProtocolMetrics::new(
@ -924,6 +923,7 @@ mod tests {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
None,
))
}
@ -939,7 +939,7 @@ mod tests {
async fn test_health() {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
let response = client.get(&format!("{}/health", server_url)).send().await;
@ -958,7 +958,7 @@ mod tests {
.register_metric("my_metric", "description");
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
metric.recorder(&[("tag", "value")]).inc(20);
@ -998,15 +998,15 @@ mod tests {
#[tokio::test]
async fn test_tracing() {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let server_url = test_server(
application,
Arc::clone(&app_server),
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
Some(Arc::<RingBufferTraceCollector>::clone(&trace_collector)),
);
));
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
let response = client
@ -1036,7 +1036,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1083,7 +1083,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
// Set up client
let client = Client::new();
@ -1209,7 +1209,7 @@ mod tests {
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1399,7 +1399,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1693,7 +1693,6 @@ mod tests {
fn test_server(
application: Arc<ApplicationState>,
server: Arc<AppServer<ConnectionManagerImpl>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> String {
// NB: specify port 0 to let the OS pick the port.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
@ -1710,7 +1709,6 @@ mod tests {
CancellationToken::new(),
TEST_MAX_REQUEST_SIZE,
trace_header_parser,
trace_collector,
));
println!("Started server at {}", server_url);
server_url
@ -1734,7 +1732,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
(app_server, server_url)
}

View File

@ -7,11 +7,7 @@ use query::{
exec::IOxExecutionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
group_by::{Aggregate, WindowDuration},
plan::{
fieldlist::FieldListPlan,
seriesset::SeriesSetPlans,
stringset::{StringSetPlan, TableNamePlanBuilder},
},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
QueryDatabase,
};
@ -54,7 +50,7 @@ impl Planner {
&self,
database: Arc<D>,
predicate: Predicate,
) -> Result<TableNamePlanBuilder>
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
{

View File

@ -11,7 +11,6 @@ use trace_http::ctx::TraceHeaderParser;
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
use server::{connection::ConnectionManager, ApplicationState, Server};
use trace::TraceCollector;
pub mod error;
mod flight;
@ -90,7 +89,6 @@ pub async fn serve<M>(
application: Arc<ApplicationState>,
server: Arc<Server<M>>,
trace_header_parser: TraceHeaderParser,
trace_collector: Option<Arc<dyn TraceCollector>>,
shutdown: CancellationToken,
serving_readiness: ServingReadiness,
) -> Result<()>
@ -109,7 +107,7 @@ where
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(
trace_header_parser,
Arc::clone(application.metric_registry()),
trace_collector,
application.trace_collector().clone(),
true,
));

View File

@ -199,24 +199,6 @@ where
Ok(Response::new(RestoreDatabaseResponse {}))
}
async fn list_deleted_databases(
&self,
_: Request<ListDeletedDatabasesRequest>,
) -> Result<Response<ListDeletedDatabasesResponse>, Status> {
let deleted_databases = self
.server
.list_deleted_databases()
.await
.map_err(default_server_error_handler)?
.into_iter()
.map(Into::into)
.collect();
Ok(Response::new(ListDeletedDatabasesResponse {
deleted_databases,
}))
}
async fn list_detailed_databases(
&self,
_: Request<ListDetailedDatabasesRequest>,

View File

@ -724,14 +724,14 @@ where
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let builder = Planner::new(&ctx)
let plan = Planner::new(&ctx)
.table_names(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTables { db_name })?;
let table_names = ctx
.to_table_names(builder)
.to_string_set(plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTables { db_name })?;
@ -1116,11 +1116,11 @@ mod tests {
let chunk0 = TestChunk::new("h2o")
.with_id(0)
.with_predicate_match(PredicateMatch::AtLeastOne);
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
let chunk1 = TestChunk::new("o2")
.with_id(1)
.with_predicate_match(PredicateMatch::AtLeastOne);
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
fixture
.test_storage
@ -1474,7 +1474,8 @@ mod tests {
tag_key: [0].into(),
};
let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne);
let chunk =
TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
fixture
.test_storage

View File

@ -2,11 +2,11 @@
name = "test_helpers"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
dotenv = "0.15.0"
parking_lot = "0.11.2"
tempfile = "3.1.0"
tracing-subscriber = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
observability_deps = { path = "../observability_deps" }

View File

@ -292,7 +292,7 @@ struct TestServer {
}
// Options for creating test servers
#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct TestConfig {
/// Additional environment variables
env: Vec<(String, String)>,

View File

@ -237,15 +237,6 @@ async fn test_list_databases() {
assert!(rules.lifecycle_rules.is_some());
}
// validate that neither database appears in the list of deleted databases
let deleted_databases = client
.list_deleted_databases()
.await
.expect("list deleted databases failed");
let names: Vec<_> = deleted_databases.into_iter().map(|db| db.db_name).collect();
assert!(!names.contains(&name1));
assert!(!names.contains(&name2));
// now fetch without defaults, and neither should have their rules filled in
let omit_defaults = true;
let databases: Vec<_> = client
@ -299,20 +290,6 @@ async fn test_list_databases() {
let names: Vec<_> = databases.iter().map(|rules| rules.name.clone()).collect();
assert!(!dbg!(&names).contains(&name1));
assert!(dbg!(&names).contains(&name2));
// The deleted database should be included in the list of deleted databases
let deleted_databases = client
.list_deleted_databases()
.await
.expect("list deleted databases failed");
assert!(
deleted_databases
.iter()
.any(|db| db.db_name == name1 && db.generation_id == 0),
"could not find expected database in {:?}",
deleted_databases
);
}
#[tokio::test]
@ -1144,8 +1121,8 @@ async fn test_get_server_status_global_error() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
// we need to "break" the object store AFTER the server was started, otherwise the server process will exit
// immediately
// we need to "break" the object store AFTER the server was started, otherwise the server
// process will exit immediately
let metadata = server_fixture.dir().metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o000);
@ -1160,7 +1137,8 @@ async fn test_get_server_status_global_error() {
loop {
let status = client.get_server_status().await.unwrap();
if let Some(err) = status.error {
assert!(dbg!(err.message).starts_with("error listing databases in object storage:"));
assert!(dbg!(err.message)
.starts_with("error getting server config from object storage:"));
assert!(status.database_statuses.is_empty());
return;
}
@ -1231,6 +1209,33 @@ async fn test_get_server_status_db_error() {
other_gen_path.push("rules.pb");
std::fs::write(other_gen_path, "foo").unwrap();
// create the server config listing the ownership of these three databases
let mut path = server_fixture.dir().to_path_buf();
path.push("42");
path.push("config.pb");
let data = ServerConfig {
databases: vec![
(String::from("my_db"), String::from("42/my_db")),
(
String::from("soft_deleted"),
String::from("42/soft_deleted"),
),
(
String::from("multiple_active"),
String::from("42/multiple_active"),
),
]
.into_iter()
.collect(),
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_persisted_server_config(&data, &mut encoded)
.expect("server config serialization should be valid");
let encoded = encoded.freeze();
std::fs::write(path, encoded).unwrap();
// initialize
client.update_server_id(42).await.expect("set ID failed");
server_fixture.wait_server_initialized().await;

View File

@ -232,18 +232,6 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db));
// Listing deleted databases does not include the newly created, active database
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(db).not());
// Listing detailed database info does include the active database, along with its generation
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -279,18 +267,6 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db).not());
// ... unless we ask to list deleted databases
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0));
// Listing detailed database info does include the deleted database
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -340,18 +316,6 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db));
// And the one deleted database will be in the deleted list
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0));
// Listing detailed database info includes both active and deleted
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -387,18 +351,6 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db).not());
// The 2 generations of the database should be in the deleted list
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0).and(deleted_db_match(db, 1)));
// Listing detailed database info includes both deleted generations
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -438,18 +390,6 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db));
// Only generation 1 is in the deleted list
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0).not().and(deleted_db_match(db, 1)));
// Listing detailed database info includes both active and deleted
Command::cargo_bin("influxdb_iox")
.unwrap()

View File

@ -1,5 +1,8 @@
use crate::{
common::server_fixture::ServerFixture,
common::{
server_fixture::{ServerFixture, TestConfig},
udp_listener::UdpCapture,
},
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
};
use arrow_util::assert_batches_sorted_eq;
@ -17,6 +20,7 @@ use rdkafka::{
ClientConfig, Message, Offset, TopicPartitionList,
};
use std::convert::TryFrom;
use tempfile::TempDir;
use test_helpers::assert_contains;
use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration};
@ -325,3 +329,89 @@ async fn test_create_database_missing_write_buffer_sequencers() {
&err
);
}
#[tokio::test]
pub async fn test_cross_write_buffer_tracing() {
let write_buffer_dir = TempDir::new().unwrap();
// setup tracing
let udp_capture = UdpCapture::new().await;
let test_config = TestConfig::new()
.with_env("TRACES_EXPORTER", "jaeger")
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
.with_client_header("jaeger-debug-id", "some-debug-id");
// we need to use two servers but the same DB name here because the Kafka topic is named after the DB name
let db_name = rand_name();
// create producer server
let server_write = ServerFixture::create_single_use_with_config(test_config.clone()).await;
server_write
.management_client()
.update_server_id(1)
.await
.unwrap();
server_write.wait_server_initialized().await;
let conn_write = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
options: kafka_sequencer_options(),
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_write.clone())
.build(server_write.grpc_channel())
.await;
// create consumer DB
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
server_read
.management_client()
.update_server_id(2)
.await
.unwrap();
server_read.wait_server_initialized().await;
let conn_read = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
..conn_write
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_read)
.build(server_read.grpc_channel())
.await;
// write some points
let mut write_client = server_write.write_client();
let lp_lines = [
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// "shallow" packet inspection and verify the UDP server got
// something that had some expected results (maybe we could
// eventually verify the payload here too)
udp_capture
.wait_for(|m| m.to_string().contains("IOx write buffer"))
.await;
udp_capture
.wait_for(|m| m.to_string().contains("stored entry"))
.await;
// // debugging assistance
// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// println!("Traces received (1):\n\n{:#?}", udp_capture.messages());
// wait for the UDP server to shutdown
udp_capture.stop().await
}

View File

@ -1,7 +1,7 @@
[package]
name = "time"
version = "0.1.0"
edition = "2018"
edition = "2021"
description = "Time functionality for IOx"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "trace"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
description = "Distributed tracing support within IOx"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "trace_exporters"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
description = "Additional tracing exporters for IOx"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "trace_http"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
description = "Distributed tracing support for HTTP services"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "tracker"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
edition = "2021"
description = "Utilities for tracking resource utilisation within IOx"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "trogging"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
edition = "2021"
description = "IOx logging pipeline built upon tokio-tracing"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -12,7 +12,7 @@ logfmt = { path = "../logfmt" }
observability_deps = { path = "../observability_deps" }
thiserror = "1.0.30"
tracing-log = "0.1"
tracing-subscriber = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
structopt = { version = "0.3.25", optional = true }
[dev-dependencies]

View File

@ -105,7 +105,7 @@ impl LoggingConfig {
pub fn with_builder<W>(&self, builder: Builder<W>) -> Builder<BoxMakeWriter>
where
W: MakeWriter + Send + Sync + Clone + 'static,
W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static,
{
builder
.with_log_filter(&self.log_filter)
@ -129,7 +129,7 @@ pub trait LoggingConfigBuilderExt {
impl<W> LoggingConfigBuilderExt for Builder<W>
where
W: MakeWriter + Send + Sync + Clone + 'static,
W: for<'writer> MakeWriter<'writer> + Send + Sync + Clone + 'static,
{
fn with_logging_config(self, config: &LoggingConfig) -> Builder<BoxMakeWriter> {
config.with_builder(self)

View File

@ -86,7 +86,7 @@ impl Builder {
impl<W> Builder<W> {
pub fn with_writer<W2>(self, make_writer: W2) -> Builder<W2>
where
W2: MakeWriter + Send + Sync + 'static,
W2: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
{
Builder::<W2> {
make_writer,
@ -103,7 +103,7 @@ impl<W> Builder<W> {
// This needs to be a separate impl block because they place different bounds on the type parameters.
impl<W> Builder<W>
where
W: MakeWriter + Send + Sync + 'static,
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
{
pub const DEFAULT_LOG_FILTER: &'static str = "warn";
@ -277,17 +277,30 @@ impl Drop for TroggingGuard {
fn make_writer<M>(m: M) -> BoxMakeWriter
where
M: MakeWriter + Send + Sync + 'static,
M: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
{
fmt::writer::BoxMakeWriter::new(move || {
std::io::LineWriter::with_capacity(
MAX_LINE_LENGTH,
LimitedWriter(MAX_LINE_LENGTH, m.make_writer()),
)
BoxMakeWriter::new(MakeWriterHelper {
inner: BoxMakeWriter::new(m),
})
}
struct MakeWriterHelper {
inner: BoxMakeWriter,
}
impl<'a> MakeWriter<'a> for MakeWriterHelper {
type Writer = Box<dyn Write + 'a>;
fn make_writer(&'a self) -> Self::Writer {
Box::new(std::io::LineWriter::with_capacity(
MAX_LINE_LENGTH,
LimitedWriter(MAX_LINE_LENGTH, self.inner.make_writer()),
))
}
}
struct LimitedWriter<W: Write>(usize, W);
impl<W: Write> Write for LimitedWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if buf.is_empty() {
@ -341,7 +354,7 @@ pub mod test_util {
}
}
impl MakeWriter for TestWriter {
impl MakeWriter<'_> for TestWriter {
type Writer = SynchronizedWriter<Vec<u8>>;
fn make_writer(&self) -> Self::Writer {
@ -356,9 +369,9 @@ pub mod test_util {
/// Removes non-determinism by removing timestamps from the log lines.
/// It supports the built-in tracing timestamp format and the logfmt timestamps.
pub fn without_timestamps(&self) -> String {
// logfmt or fmt::layer() time format
// logfmt (e.g. `time=12345`) or fmt::layer() (e.g. `2021-10-25T13:48:50.555258`) time format
let timestamp = regex::Regex::new(
r"(?m)( ?time=[0-9]+|^([A-Z][a-z]{2}) \d{1,2} \d{2}:\d{2}:\d{2}.\d{3} *)",
r"(?m)( ?time=[0-9]+|^(\d{4})-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}.\d+Z *)",
)
.unwrap();
timestamp.replace_all(&self.to_string(), "").to_string()
@ -379,7 +392,7 @@ pub mod test_util {
/// the logging macros invoked by the function.
pub fn log_test<W, F>(builder: Builder<W>, f: F) -> Captured
where
W: MakeWriter + Send + Sync + 'static,
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
F: Fn(),
{
let (writer, output) = TestWriter::new();
@ -401,7 +414,7 @@ pub mod test_util {
/// and returns the captured output.
pub fn simple_test<W>(builder: Builder<W>) -> Captured
where
W: MakeWriter + Send + Sync + 'static,
W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
{
log_test(builder, || {
error!("foo");
@ -598,7 +611,8 @@ ERROR foo
#[test]
fn line_buffering() {
let (test_writer, captured) = TestWriter::new();
let mut writer = make_writer(test_writer).make_writer();
let mw = make_writer(test_writer);
let mut writer = mw.make_writer();
writer.write_all("foo".as_bytes()).unwrap();
// wasn't flushed yet because there was no newline yet
assert_eq!(captured.to_string(), "");
@ -611,7 +625,8 @@ ERROR foo
// another case when the line buffer flushes even before a newline is when the internal buffer limit
let (test_writer, captured) = TestWriter::new();
let mut writer = make_writer(test_writer).make_writer();
let mw = make_writer(test_writer);
let mut writer = mw.make_writer();
let long = std::iter::repeat(b'X')
.take(MAX_LINE_LENGTH)
.collect::<Vec<u8>>();

View File

@ -1,7 +1,7 @@
[package]
name = "write_buffer"
version = "0.1.0"
edition = "2018"
edition = "2021"
[dependencies]
async-trait = "0.1"