From 6a2e5d69f1137a5f7f8b184aa3b6307f41353265 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 22 Mar 2021 15:50:42 -0400 Subject: [PATCH 1/6] chore: e2e test for unsigned types --- server/src/query_tests/scenarios.rs | 16 ++++++++++++ server/src/query_tests/sql.rs | 34 ++++++++++++++++++++++++++ server/src/query_tests/table_schema.rs | 18 ++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/server/src/query_tests/scenarios.rs b/server/src/query_tests/scenarios.rs index ca4bd9a868..427621606b 100644 --- a/server/src/query_tests/scenarios.rs +++ b/server/src/query_tests/scenarios.rs @@ -84,6 +84,22 @@ impl DBSetup for TwoMeasurements { } } +pub struct TwoMeasurementsUnsignedType {} +#[async_trait] +impl DBSetup for TwoMeasurementsUnsignedType { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; + let lp_lines = vec![ + "restaurant,town=andover count=40000u 100", + "restaurant,town=reading count=632u 120", + "school,town=reading count=17u 150", + "school,town=andover count=25u 160", + ]; + + make_one_chunk_scenarios(partition_key, &lp_lines.join("\n")).await + } +} + /// Single measurement that has several different chunks with /// different (but compatible) schema pub struct MultiChunkSchemaMerge {} diff --git a/server/src/query_tests/sql.rs b/server/src/query_tests/sql.rs index 727853dca4..4c247a2313 100644 --- a/server/src/query_tests/sql.rs +++ b/server/src/query_tests/sql.rs @@ -143,6 +143,40 @@ async fn sql_select_with_schema_merge() { run_sql_test_case!(MultiChunkSchemaMerge {}, "SELECT * from cpu", &expected); } +#[tokio::test] +async fn sql_select_from_restaurant() { + let expected = vec![ + "+---------+-------+", + "| town | count |", + "+---------+-------+", + "| andover | 40000 |", + "| reading | 632 |", + "+---------+-------+", + ]; + run_sql_test_case!( + TwoMeasurementsUnsignedType {}, + "SELECT town, count from restaurant", + &expected + ); +} + +#[tokio::test] +async fn sql_select_from_school() { + let expected = vec![ + "+---------+-------+", + "| town | count |", + "+---------+-------+", + "| reading | 17 |", + "| andover | 25 |", + "+---------+-------+", + ]; + run_sql_test_case!( + TwoMeasurementsUnsignedType {}, + "SELECT town, count from school", + &expected + ); +} + #[tokio::test] async fn sql_select_with_schema_merge_subset() { let expected = vec![ diff --git a/server/src/query_tests/table_schema.rs b/server/src/query_tests/table_schema.rs index c3b49df0ae..7e68ff4eb5 100644 --- a/server/src/query_tests/table_schema.rs +++ b/server/src/query_tests/table_schema.rs @@ -113,3 +113,21 @@ async fn list_schema_disk_selection() { run_table_schema_test_case!(TwoMeasurements {}, selection, "disk", expected_schema); } + +#[tokio::test] +async fn list_schema_location_all() { + // we expect columns to come out in lexographic order by name + let expected_schema = SchemaBuilder::new() + .field("count", DataType::UInt64) + .timestamp() + .tag("town") + .build() + .unwrap(); + + run_table_schema_test_case!( + TwoMeasurementsUnsignedType {}, + Selection::All, + "restaurant", + expected_schema + ); +} From cf51a1a3f1c3f0a0f4fc338e474afc72c84c7072 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Fri, 19 Mar 2021 10:19:13 +0100 Subject: [PATCH 2/6] feat: Add API for ShardConfig --- data_types/src/database_rules.rs | 269 +++++++++++++++++- generated_types/build.rs | 1 + .../influxdata/iox/management/v1/shard.proto | 68 +++++ 3 files changed, 336 insertions(+), 2 deletions(-) create mode 100644 generated_types/protos/influxdata/iox/management/v1/shard.proto diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 5e5013c952..517c126dbd 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -758,7 +758,7 @@ pub struct ShardConfig { /// Maps a matcher with specific target group. If the line/row matches /// it should be sent to the group. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] pub struct MatcherToTargets { pub matcher: Matcher, pub target: NodeGroup, @@ -781,7 +781,7 @@ pub struct HashRing { /// A matcher is used to match routing rules or subscriptions on a row-by-row /// (or line) basis. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, Default)] pub struct Matcher { /// if provided, match if the table name matches against the regex #[serde(with = "serde_regex")] @@ -800,6 +800,147 @@ impl PartialEq for Matcher { } impl Eq for Matcher {} +impl From for management::ShardConfig { + fn from(shard_config: ShardConfig) -> Self { + Self { + specific_targets: shard_config.specific_targets.map(|i| i.into()), + hash_ring: shard_config.hash_ring.map(|i| i.into()), + ignore_errors: shard_config.ignore_errors, + } + } +} + +impl TryFrom for ShardConfig { + type Error = FieldViolation; + + fn try_from(proto: management::ShardConfig) -> Result { + Ok(Self { + specific_targets: proto + .specific_targets + .map(|i| i.try_into()) + .map_or(Ok(None), |r| r.map(Some))?, + hash_ring: proto + .hash_ring + .map(|i| i.try_into()) + .map_or(Ok(None), |r| r.map(Some))?, + ignore_errors: proto.ignore_errors, + }) + } +} + +/// Returns none if v matches its default value. +fn none_if_default(v: T) -> Option { + if v == Default::default() { + None + } else { + Some(v) + } +} + +impl From for management::MatcherToTargets { + fn from(matcher_to_targets: MatcherToTargets) -> Self { + Self { + matcher: none_if_default(matcher_to_targets.matcher.into()), + target: none_if_default(from_node_group_for_management_node_group( + matcher_to_targets.target, + )), + } + } +} + +impl TryFrom for MatcherToTargets { + type Error = FieldViolation; + + fn try_from(proto: management::MatcherToTargets) -> Result { + Ok(Self { + matcher: proto.matcher.unwrap_or_default().try_into()?, + target: try_from_management_node_group_for_node_group( + proto.target.unwrap_or_default(), + )?, + }) + } +} + +impl From for management::HashRing { + fn from(hash_ring: HashRing) -> Self { + Self { + table_name: hash_ring.table_name, + columns: hash_ring.columns, + node_groups: hash_ring + .node_groups + .into_iter() + .map(from_node_group_for_management_node_group) + .collect(), + } + } +} + +impl TryFrom for HashRing { + type Error = FieldViolation; + + fn try_from(proto: management::HashRing) -> Result { + Ok(Self { + table_name: proto.table_name, + columns: proto.columns, + node_groups: proto + .node_groups + .into_iter() + .map(try_from_management_node_group_for_node_group) + .collect::, _>>()?, + }) + } +} + +// cannot (and/or don't know how to) add impl From inside prost generated code +fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup { + management::NodeGroup { + nodes: node_group + .into_iter() + .map(|id| management::node_group::Node { id }) + .collect(), + } +} + +fn try_from_management_node_group_for_node_group( + proto: management::NodeGroup, +) -> Result { + Ok(proto.nodes.into_iter().map(|i| i.id).collect()) +} + +impl From for management::Matcher { + fn from(matcher: Matcher) -> Self { + Self { + table_name_regex: matcher + .table_name_regex + .map_or_else(|| "".into(), |r| r.to_string()), + predicate: matcher.predicate.unwrap_or_else(|| "".into()), + } + } +} + +impl TryFrom for Matcher { + type Error = FieldViolation; + + fn try_from(proto: management::Matcher) -> Result { + let table_name_regex = match &proto.table_name_regex as &str { + "" => None, + re => Some(Regex::new(re).map_err(|e| FieldViolation { + field: "table_name_regex".to_string(), + description: e.to_string(), + })?), + }; + let predicate = match proto.predicate { + p if p.is_empty() => None, + p => Some(p), + }; + + Ok(Self { + table_name_regex, + predicate, + }) + } +} + /// `PartitionId` is the object storage identifier for a specific partition. It /// should be a path that can be used against an object store to locate all the /// files and subdirectories for a partition. It takes the form of `/ = protobuf.clone().try_into(); + assert!(matcher.is_err()); + assert_eq!(matcher.err().unwrap().field, "table_name_regex"); + } + + #[test] + fn test_hash_ring_default() { + let protobuf = management::HashRing { + ..Default::default() + }; + + let hash_ring: HashRing = protobuf.clone().try_into().unwrap(); + let back: management::HashRing = hash_ring.clone().into(); + + assert_eq!(hash_ring.table_name, false); + assert_eq!(protobuf.table_name, back.table_name); + assert!(hash_ring.columns.is_empty()); + assert_eq!(protobuf.columns, back.columns); + assert!(hash_ring.node_groups.is_empty()); + assert_eq!(protobuf.node_groups, back.node_groups); + } + + #[test] + fn test_hash_ring_nodes() { + let protobuf = management::HashRing { + node_groups: vec![ + management::NodeGroup { + nodes: vec![ + management::node_group::Node { id: 10 }, + management::node_group::Node { id: 11 }, + management::node_group::Node { id: 12 }, + ], + }, + management::NodeGroup { + nodes: vec![management::node_group::Node { id: 20 }], + }, + ], + ..Default::default() + }; + + let hash_ring: HashRing = protobuf.clone().try_into().unwrap(); + + assert_eq!(hash_ring.node_groups.len(), 2); + assert_eq!(hash_ring.node_groups[0].len(), 3); + assert_eq!(hash_ring.node_groups[1].len(), 1); + } + + #[test] + fn test_matcher_to_targets_default() { + let protobuf = management::MatcherToTargets { + ..Default::default() + }; + + let matcher_to_targets: MatcherToTargets = protobuf.clone().try_into().unwrap(); + let back: management::MatcherToTargets = matcher_to_targets.clone().into(); + + assert_eq!( + matcher_to_targets.matcher, + Matcher { + ..Default::default() + } + ); + assert_eq!(protobuf.matcher, back.matcher); + + assert_eq!(matcher_to_targets.target, Vec::::new()); + assert_eq!(protobuf.target, back.target); + } + + #[test] + fn test_shard_config_default() { + let protobuf = management::ShardConfig { + ..Default::default() + }; + + let shard_config: ShardConfig = protobuf.clone().try_into().unwrap(); + let back: management::ShardConfig = shard_config.clone().into(); + + assert!(shard_config.specific_targets.is_none()); + assert_eq!(protobuf.specific_targets, back.specific_targets); + + assert!(shard_config.hash_ring.is_none()); + assert_eq!(protobuf.hash_ring, back.hash_ring); + + assert_eq!(shard_config.ignore_errors, false); + assert_eq!(protobuf.ignore_errors, back.ignore_errors); + } } diff --git a/generated_types/build.rs b/generated_types/build.rs index 38e65d59e0..cb5782aa69 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -37,6 +37,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { management_path.join("chunk.proto"), management_path.join("partition.proto"), management_path.join("service.proto"), + management_path.join("shard.proto"), management_path.join("jobs.proto"), write_path.join("service.proto"), root.join("grpc/health/v1/service.proto"), diff --git a/generated_types/protos/influxdata/iox/management/v1/shard.proto b/generated_types/protos/influxdata/iox/management/v1/shard.proto new file mode 100644 index 0000000000..eb081287d3 --- /dev/null +++ b/generated_types/protos/influxdata/iox/management/v1/shard.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; +package influxdata.iox.management.v1; + +// NOTE: documentation is manually synced from data_types/src/database_rules.rs + +// `ShardConfig` defines rules for assigning a line/row to an individual +// host or a group of hosts. A shard +// is a logical concept, but the usage is meant to split data into +// mutually exclusive areas. The rough order of organization is: +// database -> shard -> partition -> chunk. For example, you could shard +// based on table name and assign to 1 of 10 shards. Within each +// shard you would have partitions, which would likely be based off time. +// This makes it possible to horizontally scale out writes. +message ShardConfig { + /// An optional matcher. If there is a match, the route will be evaluated to + /// the given targets, otherwise the hash ring will be evaluated. This is + /// useful for overriding the hashring function on some hot spot. For + /// example, if you use the table name as the input to the hash function + /// and your ring has 4 slots. If two tables that are very hot get + /// assigned to the same slot you can override that by putting in a + /// specific matcher to pull that table over to a different node. + MatcherToTargets specific_targets = 1; + + /// An optional default hasher which will route to one in a collection of + /// nodes. + HashRing hash_ring = 2; + + /// If set to true the router will ignore any errors sent by the remote + /// targets in this route. That is, the write request will succeed + /// regardless of this route's success. + bool ignore_errors = 3; +} + +// Maps a matcher with specific target group. If the line/row matches +// it should be sent to the group. +message MatcherToTargets { + Matcher matcher = 1; + NodeGroup target = 2; +} + +/// A matcher is used to match routing rules or subscriptions on a row-by-row +/// (or line) basis. +message Matcher { + // if provided, match if the table name matches against the regex + string table_name_regex = 1; + // paul: what should we use for predicate matching here against a single row/line? + string predicate = 2; +} + +// A collection of IOx nodes +message NodeGroup { + message Node { + uint32 id = 1; + } + repeated Node nodes = 1; +} + +// HashRing is a rule for creating a hash key for a row and mapping that to +// an individual node on a ring. +message HashRing { + // If true the table name will be included in the hash key + bool table_name = 1; + // include the values of these columns in the hash key + repeated string columns = 2; + // ring of node groups. Each group holds a shard + repeated NodeGroup node_groups = 3; +} + From 8962236e0424704b746cd0bd42ded686d8a516c0 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Mon, 22 Mar 2021 23:35:36 +0100 Subject: [PATCH 3/6] fix: Update data_types/src/database_rules.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- data_types/src/database_rules.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 517c126dbd..00e12f28bf 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -912,8 +912,9 @@ impl From for management::Matcher { Self { table_name_regex: matcher .table_name_regex - .map_or_else(|| "".into(), |r| r.to_string()), - predicate: matcher.predicate.unwrap_or_else(|| "".into()), + .map(|r| r.to_string()) + .unwrap_or_default(), + predicate: matcher.predicate.unwrap_or_default(), } } } @@ -1441,7 +1442,7 @@ mod tests { ..Default::default() }; - let matcher: Result = protobuf.clone().try_into(); + let matcher: Result = protobuf.try_into(); assert!(matcher.is_err()); assert_eq!(matcher.err().unwrap().field, "table_name_regex"); } @@ -1481,7 +1482,7 @@ mod tests { ..Default::default() }; - let hash_ring: HashRing = protobuf.clone().try_into().unwrap(); + let hash_ring: HashRing = protobuf.try_into().unwrap(); assert_eq!(hash_ring.node_groups.len(), 2); assert_eq!(hash_ring.node_groups[0].len(), 3); From 1cbfea7096cb494f3c2b004127d388f908da24af Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 23 Mar 2021 11:36:46 +0000 Subject: [PATCH 4/6] refactor: don't implement serialize for Db (#1037) --- server/src/db.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index a3de6a84e3..01216e4dc3 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -16,7 +16,6 @@ use mutable_buffer::MutableBufferDb; use parking_lot::Mutex; use query::{Database, PartitionChunk}; use read_buffer::Database as ReadBufferDb; -use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use crate::buffer::Buffer; @@ -81,31 +80,26 @@ pub type Result = std::result::Result; const STARTING_SEQUENCE: u64 = 1; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] /// This is the main IOx Database object. It is the root object of any /// specific InfluxDB IOx instance pub struct Db { - #[serde(flatten)] pub rules: DatabaseRules, - #[serde(skip)] /// The (optional) mutable buffer stores incoming writes. If a /// database does not have a mutable buffer it can not accept /// writes (it is a read replica) pub mutable_buffer: Option, - #[serde(skip)] /// The read buffer holds chunk data in an in-memory optimized /// format. pub read_buffer: Arc, - #[serde(skip)] /// The wal buffer holds replicated writes in an append in-memory /// buffer. This buffer is used for sending data to subscribers /// and to persist segments in object storage for recovery. pub wal_buffer: Option>, - #[serde(skip)] sequence: AtomicU64, } impl Db { From d4eef65f2a101ca32bc52565f2f0ed2b91a77b8f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 23 Mar 2021 12:34:55 +0000 Subject: [PATCH 5/6] feat: share job registry with Db struct (#1038) --- server/src/config.rs | 36 ++++++++++++++------ server/src/db.rs | 26 +++++++++------ server/src/lib.rs | 58 ++++++++++++++++++++++++--------- server/src/query_tests/utils.rs | 4 ++- server/src/snapshot.rs | 6 +++- 5 files changed, 93 insertions(+), 37 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index 75d8f8b1ed..7778add316 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -1,5 +1,8 @@ -/// This module contains code for managing the configuration of the server. -use crate::{db::Db, Error, Result}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::{Arc, RwLock}, +}; + use data_types::{ database_rules::{DatabaseRules, WriterId}, DatabaseName, @@ -8,22 +11,28 @@ use mutable_buffer::MutableBufferDb; use object_store::path::ObjectStorePath; use read_buffer::Database as ReadBufferDb; -use std::{ - collections::{BTreeMap, BTreeSet}, - sync::{Arc, RwLock}, -}; +/// This module contains code for managing the configuration of the server. +use crate::{db::Db, Error, JobRegistry, Result}; pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json"; /// The Config tracks the configuration od databases and their rules along /// with host groups for replication. It is used as an in-memory structure /// that can be loaded incrementally from objet storage. -#[derive(Default, Debug)] +#[derive(Debug)] pub(crate) struct Config { + jobs: Arc, state: RwLock, } impl Config { + pub(crate) fn new(jobs: Arc) -> Self { + Self { + state: Default::default(), + jobs, + } + } + pub(crate) fn create_db( &self, name: DatabaseName<'static>, @@ -45,7 +54,13 @@ impl Config { let read_buffer = ReadBufferDb::new(); let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into); - let db = Arc::new(Db::new(rules, mutable_buffer, read_buffer, wal_buffer)); + let db = Arc::new(Db::new( + rules, + mutable_buffer, + read_buffer, + wal_buffer, + Arc::clone(&self.jobs), + )); state.reservations.insert(name.clone()); Ok(CreateDatabaseHandle { @@ -147,13 +162,14 @@ impl<'a> Drop for CreateDatabaseHandle<'a> { #[cfg(test)] mod test { - use super::*; use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi}; + use super::*; + #[test] fn create_db() { let name = DatabaseName::new("foo").unwrap(); - let config = Config::default(); + let config = Config::new(Arc::new(JobRegistry::new())); let rules = DatabaseRules::new(); { diff --git a/server/src/db.rs b/server/src/db.rs index 01216e4dc3..2ee81ae8ce 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -10,20 +10,20 @@ use std::{ }; use async_trait::async_trait; +use parking_lot::Mutex; +use snafu::{OptionExt, ResultExt, Snafu}; +use tracing::info; + +pub(crate) use chunk::DBChunk; use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules}; use internal_types::{data::ReplicatedWrite, selection::Selection}; use mutable_buffer::MutableBufferDb; -use parking_lot::Mutex; use query::{Database, PartitionChunk}; use read_buffer::Database as ReadBufferDb; -use snafu::{OptionExt, ResultExt, Snafu}; -use crate::buffer::Buffer; - -use tracing::info; +use crate::{buffer::Buffer, JobRegistry}; mod chunk; -pub(crate) use chunk::DBChunk; pub mod pred; mod streams; @@ -100,6 +100,8 @@ pub struct Db { /// and to persist segments in object storage for recovery. pub wal_buffer: Option>, + jobs: Arc, + sequence: AtomicU64, } impl Db { @@ -108,6 +110,7 @@ impl Db { mutable_buffer: Option, read_buffer: ReadBufferDb, wal_buffer: Option, + jobs: Arc, ) -> Self { let wal_buffer = wal_buffer.map(Mutex::new); let read_buffer = Arc::new(read_buffer); @@ -116,6 +119,7 @@ impl Db { mutable_buffer, read_buffer, wal_buffer, + jobs, sequence: AtomicU64::new(STARTING_SEQUENCE), } } @@ -376,10 +380,6 @@ impl Database for Db { #[cfg(test)] mod tests { - use crate::query_tests::utils::make_db; - - use super::*; - use arrow_deps::{ arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect, }; @@ -392,6 +392,10 @@ mod tests { }; use test_helpers::assert_contains; + use crate::query_tests::utils::make_db; + + use super::*; + #[tokio::test] async fn write_no_mutable_buffer() { // Validate that writes are rejected if there is no mutable buffer @@ -592,6 +596,7 @@ mod tests { buffer_size: 300, ..Default::default() }; + let rules = DatabaseRules { mutable_buffer_config: Some(mbconf.clone()), ..Default::default() @@ -602,6 +607,7 @@ mod tests { Some(MutableBufferDb::new("foo")), read_buffer::Database::new(), None, // wal buffer + Arc::new(JobRegistry::new()), ); let mut writer = TestLPWriter::default(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 69e5dfa3b6..103559200d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -84,9 +84,8 @@ use data_types::{ job::Job, {DatabaseName, DatabaseNameError}, }; -use internal_types::data::{lines_to_replicated_write, ReplicatedWrite}; - use influxdb_line_protocol::ParsedLine; +use internal_types::data::{lines_to_replicated_write, ReplicatedWrite}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, DatabaseStore}; @@ -95,7 +94,9 @@ use crate::{ object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME, }, db::Db, - tracker::{TrackedFutureExt, Tracker, TrackerId, TrackerRegistryWithHistory}, + tracker::{ + TrackedFutureExt, Tracker, TrackerId, TrackerRegistration, TrackerRegistryWithHistory, + }, }; pub mod buffer; @@ -149,9 +150,34 @@ pub enum Error { pub type Result = std::result::Result; -const STORE_ERROR_PAUSE_SECONDS: u64 = 100; const JOB_HISTORY_SIZE: usize = 1000; +/// The global job registry +#[derive(Debug)] +pub struct JobRegistry { + inner: Mutex>, +} + +impl Default for JobRegistry { + fn default() -> Self { + Self { + inner: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)), + } + } +} + +impl JobRegistry { + fn new() -> Self { + Default::default() + } + + pub fn register(&self, job: Job) -> (Tracker, TrackerRegistration) { + self.inner.lock().register(job) + } +} + +const STORE_ERROR_PAUSE_SECONDS: u64 = 100; + /// `Server` is the container struct for how servers store data internally, as /// well as how they communicate with other servers. Each server will have one /// of these structs, which keeps track of all replication and query rules. @@ -162,18 +188,20 @@ pub struct Server { connection_manager: Arc, pub store: Arc, executor: Arc, - jobs: Mutex>, + jobs: Arc, } impl Server { pub fn new(connection_manager: M, store: Arc) -> Self { + let jobs = Arc::new(JobRegistry::new()); + Self { id: AtomicU32::new(SERVER_ID_NOT_SET), - config: Arc::new(Config::default()), + config: Arc::new(Config::new(Arc::clone(&jobs))), store, connection_manager: Arc::new(connection_manager), executor: Arc::new(Executor::new()), - jobs: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)), + jobs, } } @@ -354,7 +382,7 @@ impl Server { let writer_id = self.require_id()?; let store = Arc::clone(&self.store); - let (_, tracker) = self.jobs.lock().register(Job::PersistSegment { + let (_, tracker) = self.jobs.register(Job::PersistSegment { writer_id, segment_id: segment.id, }); @@ -390,7 +418,7 @@ impl Server { } pub fn spawn_dummy_job(&self, nanos: Vec) -> Tracker { - let (tracker, registration) = self.jobs.lock().register(Job::Dummy { + let (tracker, registration) = self.jobs.register(Job::Dummy { nanos: nanos.clone(), }); @@ -422,7 +450,7 @@ impl Server { .db(&name) .context(DatabaseNotFound { db_name: &db_name })?; - let (tracker, registration) = self.jobs.lock().register(Job::CloseChunk { + let (tracker, registration) = self.jobs.register(Job::CloseChunk { db_name: db_name.clone(), partition_key: partition_key.clone(), chunk_id, @@ -467,12 +495,12 @@ impl Server { /// Returns a list of all jobs tracked by this server pub fn tracked_jobs(&self) -> Vec> { - self.jobs.lock().tracked() + self.jobs.inner.lock().tracked() } /// Returns a specific job tracked by this server pub fn get_job(&self, id: TrackerId) -> Option> { - self.jobs.lock().get(id) + self.jobs.inner.lock().get(id) } /// Background worker function @@ -480,7 +508,7 @@ impl Server { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); while !shutdown.is_cancelled() { - self.jobs.lock().reclaim(); + self.jobs.inner.lock().reclaim(); tokio::select! { _ = interval.tick() => {}, @@ -620,6 +648,8 @@ mod tests { use futures::TryStreamExt; use parking_lot::Mutex; use snafu::Snafu; + use tokio::task::JoinHandle; + use tokio_util::sync::CancellationToken; use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect}; use data_types::database_rules::{ @@ -632,8 +662,6 @@ mod tests { use crate::buffer::Segment; use super::*; - use tokio::task::JoinHandle; - use tokio_util::sync::CancellationToken; type TestError = Box; type Result = std::result::Result; diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index e8ff457b8c..f55cd01463 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -1,7 +1,8 @@ use data_types::database_rules::DatabaseRules; use mutable_buffer::MutableBufferDb; -use crate::db::Db; +use crate::{db::Db, JobRegistry}; +use std::sync::Arc; /// Used for testing: create a Database with a local store pub fn make_db() -> Db { @@ -11,5 +12,6 @@ pub fn make_db() -> Db { Some(MutableBufferDb::new(name)), read_buffer::Database::new(), None, // wal buffer + Arc::new(JobRegistry::new()), ) } diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index de64050e16..76cdc88b71 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -361,7 +361,10 @@ impl TryClone for MemWriter { #[cfg(test)] mod tests { - use crate::db::{DBChunk, Db}; + use crate::{ + db::{DBChunk, Db}, + JobRegistry, + }; use read_buffer::Database as ReadBufferDb; use super::*; @@ -480,6 +483,7 @@ mem,host=A,region=west used=45 1 Some(MutableBufferDb::new(name)), ReadBufferDb::new(), None, // wal buffer + Arc::new(JobRegistry::new()), ) } } From f8f81a90f71033b22adc8f7cb5ed09311a131e97 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 23 Mar 2021 13:51:13 +0000 Subject: [PATCH 6/6] feat: sequence server shutdown (#1036) * feat: sequence server shutdown * chore: pr comments --- src/influxdb_ioxd.rs | 46 ++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 1200729c0b..660d095355 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -2,7 +2,7 @@ use crate::commands::{ logging::LoggingLevel, run::{Config, ObjectStore as ObjStoreOpt}, }; -use futures::{pin_mut, FutureExt}; +use futures::{future::FusedFuture, pin_mut, FutureExt}; use hyper::server::conn::AddrIncoming; use object_store::{ self, aws::AmazonS3, azure::MicrosoftAzure, gcp::GoogleCloudStorage, ObjectStore, @@ -140,8 +140,11 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); } - // Construct a token to trigger shutdown - let token = tokio_util::sync::CancellationToken::new(); + // An internal shutdown token for internal workers + let internal_shutdown = tokio_util::sync::CancellationToken::new(); + + // Construct a token to trigger shutdown of API services + let frontend_shutdown = internal_shutdown.child_token(); // Construct and start up gRPC server let grpc_bind_addr = config.grpc_bind_address; @@ -149,21 +152,23 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { .await .context(StartListeningGrpc { grpc_bind_addr })?; - let grpc_server = rpc::serve(socket, Arc::clone(&app_server), token.clone()).fuse(); + let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); info!(bind_address=?grpc_bind_addr, "gRPC server listening"); let bind_addr = config.http_bind_address; let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?; - let http_server = http::serve(addr, Arc::clone(&app_server), token.clone()).fuse(); + let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); info!(bind_address=?bind_addr, "HTTP server listening"); let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN"); info!(git_hash, "InfluxDB IOx server ready"); // Get IOx background worker task - let app = app_server.background_worker(token.clone()).fuse(); + let background_worker = app_server + .background_worker(internal_shutdown.clone()) + .fuse(); // Shutdown signal let signal = wait_for_signal().fuse(); @@ -192,19 +197,29 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { // pin_mut constructs a Pin<&mut T> from a T by preventing moving the T // from the current stack frame and constructing a Pin<&mut T> to it pin_mut!(signal); - pin_mut!(app); + pin_mut!(background_worker); pin_mut!(grpc_server); pin_mut!(http_server); // Return the first error encountered let mut res = Ok(()); - // Trigger graceful shutdown of server components on signal - // or any background service exiting - loop { + // Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the + // process, or by a background task exiting - most likely with an error + // + // Graceful shutdown should then proceed in the following order + // 1. Stop accepting new HTTP and gRPC requests and drain existing connections + // 2. Trigger shutdown of internal background workers loops + // + // This is important to ensure background tasks, such as polling the tracker + // registry, don't exit before HTTP and gRPC requests dependent on them + while !grpc_server.is_terminated() && !http_server.is_terminated() { futures::select! { _ = signal => info!("Shutdown requested"), - _ = app => info!("Background worker shutdown"), + _ = background_worker => { + info!("background worker shutdown prematurely"); + internal_shutdown.cancel(); + }, result = grpc_server => match result { Ok(_) => info!("gRPC server shutdown"), Err(error) => { @@ -219,13 +234,16 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { res = res.and(Err(Error::ServingHttp{source: error})) } }, - complete => break } - token.cancel() + frontend_shutdown.cancel() } - info!("InfluxDB IOx server completed shutting down"); + info!("frontend shutdown completed"); + internal_shutdown.cancel(); + background_worker.await; + + info!("server completed shutting down"); res }