Merge branch 'main' into er/refactor/aggregate_tidy

pull/24376/head
kodiakhq[bot] 2021-03-23 15:18:19 +00:00 committed by GitHub
commit d55cfac453
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 531 additions and 60 deletions

View File

@ -758,7 +758,7 @@ pub struct ShardConfig {
/// Maps a matcher with specific target group. If the line/row matches
/// it should be sent to the group.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct MatcherToTargets {
pub matcher: Matcher,
pub target: NodeGroup,
@ -781,7 +781,7 @@ pub struct HashRing {
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct Matcher {
/// if provided, match if the table name matches against the regex
#[serde(with = "serde_regex")]
@ -800,6 +800,148 @@ impl PartialEq for Matcher {
}
impl Eq for Matcher {}
impl From<ShardConfig> for management::ShardConfig {
fn from(shard_config: ShardConfig) -> Self {
Self {
specific_targets: shard_config.specific_targets.map(|i| i.into()),
hash_ring: shard_config.hash_ring.map(|i| i.into()),
ignore_errors: shard_config.ignore_errors,
}
}
}
impl TryFrom<management::ShardConfig> for ShardConfig {
type Error = FieldViolation;
fn try_from(proto: management::ShardConfig) -> Result<Self, Self::Error> {
Ok(Self {
specific_targets: proto
.specific_targets
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))?,
hash_ring: proto
.hash_ring
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))?,
ignore_errors: proto.ignore_errors,
})
}
}
/// Returns none if v matches its default value.
fn none_if_default<T: Default + PartialEq>(v: T) -> Option<T> {
if v == Default::default() {
None
} else {
Some(v)
}
}
impl From<MatcherToTargets> for management::MatcherToTargets {
fn from(matcher_to_targets: MatcherToTargets) -> Self {
Self {
matcher: none_if_default(matcher_to_targets.matcher.into()),
target: none_if_default(from_node_group_for_management_node_group(
matcher_to_targets.target,
)),
}
}
}
impl TryFrom<management::MatcherToTargets> for MatcherToTargets {
type Error = FieldViolation;
fn try_from(proto: management::MatcherToTargets) -> Result<Self, Self::Error> {
Ok(Self {
matcher: proto.matcher.unwrap_or_default().try_into()?,
target: try_from_management_node_group_for_node_group(
proto.target.unwrap_or_default(),
)?,
})
}
}
impl From<HashRing> for management::HashRing {
fn from(hash_ring: HashRing) -> Self {
Self {
table_name: hash_ring.table_name,
columns: hash_ring.columns,
node_groups: hash_ring
.node_groups
.into_iter()
.map(from_node_group_for_management_node_group)
.collect(),
}
}
}
impl TryFrom<management::HashRing> for HashRing {
type Error = FieldViolation;
fn try_from(proto: management::HashRing) -> Result<Self, Self::Error> {
Ok(Self {
table_name: proto.table_name,
columns: proto.columns,
node_groups: proto
.node_groups
.into_iter()
.map(try_from_management_node_group_for_node_group)
.collect::<Result<Vec<_>, _>>()?,
})
}
}
// cannot (and/or don't know how to) add impl From inside prost generated code
fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup {
management::NodeGroup {
nodes: node_group
.into_iter()
.map(|id| management::node_group::Node { id })
.collect(),
}
}
fn try_from_management_node_group_for_node_group(
proto: management::NodeGroup,
) -> Result<NodeGroup, FieldViolation> {
Ok(proto.nodes.into_iter().map(|i| i.id).collect())
}
impl From<Matcher> for management::Matcher {
fn from(matcher: Matcher) -> Self {
Self {
table_name_regex: matcher
.table_name_regex
.map(|r| r.to_string())
.unwrap_or_default(),
predicate: matcher.predicate.unwrap_or_default(),
}
}
}
impl TryFrom<management::Matcher> for Matcher {
type Error = FieldViolation;
fn try_from(proto: management::Matcher) -> Result<Self, Self::Error> {
let table_name_regex = match &proto.table_name_regex as &str {
"" => None,
re => Some(Regex::new(re).map_err(|e| FieldViolation {
field: "table_name_regex".to_string(),
description: e.to_string(),
})?),
};
let predicate = match proto.predicate {
p if p.is_empty() => None,
p => Some(p),
};
Ok(Self {
table_name_regex,
predicate,
})
}
}
/// `PartitionId` is the object storage identifier for a specific partition. It
/// should be a path that can be used against an object store to locate all the
/// files and subdirectories for a partition. It takes the form of `/<writer
@ -1262,4 +1404,128 @@ mod tests {
assert_eq!(err3.field, "column.column_name");
assert_eq!(err3.description, "Field is required");
}
#[test]
fn test_matcher_default() {
let protobuf = management::Matcher {
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert!(matcher.table_name_regex.is_none());
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
assert_eq!(matcher.predicate, None);
assert_eq!(protobuf.predicate, back.predicate);
}
#[test]
fn test_matcher_regexp() {
let protobuf = management::Matcher {
table_name_regex: "^foo$".into(),
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$");
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
}
#[test]
fn test_matcher_bad_regexp() {
let protobuf = management::Matcher {
table_name_regex: "*".into(),
..Default::default()
};
let matcher: Result<Matcher, FieldViolation> = protobuf.try_into();
assert!(matcher.is_err());
assert_eq!(matcher.err().unwrap().field, "table_name_regex");
}
#[test]
fn test_hash_ring_default() {
let protobuf = management::HashRing {
..Default::default()
};
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
let back: management::HashRing = hash_ring.clone().into();
assert_eq!(hash_ring.table_name, false);
assert_eq!(protobuf.table_name, back.table_name);
assert!(hash_ring.columns.is_empty());
assert_eq!(protobuf.columns, back.columns);
assert!(hash_ring.node_groups.is_empty());
assert_eq!(protobuf.node_groups, back.node_groups);
}
#[test]
fn test_hash_ring_nodes() {
let protobuf = management::HashRing {
node_groups: vec![
management::NodeGroup {
nodes: vec![
management::node_group::Node { id: 10 },
management::node_group::Node { id: 11 },
management::node_group::Node { id: 12 },
],
},
management::NodeGroup {
nodes: vec![management::node_group::Node { id: 20 }],
},
],
..Default::default()
};
let hash_ring: HashRing = protobuf.try_into().unwrap();
assert_eq!(hash_ring.node_groups.len(), 2);
assert_eq!(hash_ring.node_groups[0].len(), 3);
assert_eq!(hash_ring.node_groups[1].len(), 1);
}
#[test]
fn test_matcher_to_targets_default() {
let protobuf = management::MatcherToTargets {
..Default::default()
};
let matcher_to_targets: MatcherToTargets = protobuf.clone().try_into().unwrap();
let back: management::MatcherToTargets = matcher_to_targets.clone().into();
assert_eq!(
matcher_to_targets.matcher,
Matcher {
..Default::default()
}
);
assert_eq!(protobuf.matcher, back.matcher);
assert_eq!(matcher_to_targets.target, Vec::<WriterId>::new());
assert_eq!(protobuf.target, back.target);
}
#[test]
fn test_shard_config_default() {
let protobuf = management::ShardConfig {
..Default::default()
};
let shard_config: ShardConfig = protobuf.clone().try_into().unwrap();
let back: management::ShardConfig = shard_config.clone().into();
assert!(shard_config.specific_targets.is_none());
assert_eq!(protobuf.specific_targets, back.specific_targets);
assert!(shard_config.hash_ring.is_none());
assert_eq!(protobuf.hash_ring, back.hash_ring);
assert_eq!(shard_config.ignore_errors, false);
assert_eq!(protobuf.ignore_errors, back.ignore_errors);
}
}

View File

@ -37,6 +37,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
management_path.join("chunk.proto"),
management_path.join("partition.proto"),
management_path.join("service.proto"),
management_path.join("shard.proto"),
management_path.join("jobs.proto"),
write_path.join("service.proto"),
root.join("grpc/health/v1/service.proto"),

View File

@ -0,0 +1,68 @@
syntax = "proto3";
package influxdata.iox.management.v1;
// NOTE: documentation is manually synced from data_types/src/database_rules.rs
// `ShardConfig` defines rules for assigning a line/row to an individual
// host or a group of hosts. A shard
// is a logical concept, but the usage is meant to split data into
// mutually exclusive areas. The rough order of organization is:
// database -> shard -> partition -> chunk. For example, you could shard
// based on table name and assign to 1 of 10 shards. Within each
// shard you would have partitions, which would likely be based off time.
// This makes it possible to horizontally scale out writes.
message ShardConfig {
/// An optional matcher. If there is a match, the route will be evaluated to
/// the given targets, otherwise the hash ring will be evaluated. This is
/// useful for overriding the hashring function on some hot spot. For
/// example, if you use the table name as the input to the hash function
/// and your ring has 4 slots. If two tables that are very hot get
/// assigned to the same slot you can override that by putting in a
/// specific matcher to pull that table over to a different node.
MatcherToTargets specific_targets = 1;
/// An optional default hasher which will route to one in a collection of
/// nodes.
HashRing hash_ring = 2;
/// If set to true the router will ignore any errors sent by the remote
/// targets in this route. That is, the write request will succeed
/// regardless of this route's success.
bool ignore_errors = 3;
}
// Maps a matcher with specific target group. If the line/row matches
// it should be sent to the group.
message MatcherToTargets {
Matcher matcher = 1;
NodeGroup target = 2;
}
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
message Matcher {
// if provided, match if the table name matches against the regex
string table_name_regex = 1;
// paul: what should we use for predicate matching here against a single row/line?
string predicate = 2;
}
// A collection of IOx nodes
message NodeGroup {
message Node {
uint32 id = 1;
}
repeated Node nodes = 1;
}
// HashRing is a rule for creating a hash key for a row and mapping that to
// an individual node on a ring.
message HashRing {
// If true the table name will be included in the hash key
bool table_name = 1;
// include the values of these columns in the hash key
repeated string columns = 2;
// ring of node groups. Each group holds a shard
repeated NodeGroup node_groups = 3;
}

View File

@ -1,5 +1,8 @@
/// This module contains code for managing the configuration of the server.
use crate::{db::Db, Error, Result};
use std::{
collections::{BTreeMap, BTreeSet},
sync::{Arc, RwLock},
};
use data_types::{
database_rules::{DatabaseRules, WriterId},
DatabaseName,
@ -8,22 +11,28 @@ use mutable_buffer::MutableBufferDb;
use object_store::path::ObjectStorePath;
use read_buffer::Database as ReadBufferDb;
use std::{
collections::{BTreeMap, BTreeSet},
sync::{Arc, RwLock},
};
/// This module contains code for managing the configuration of the server.
use crate::{db::Db, Error, JobRegistry, Result};
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json";
/// The Config tracks the configuration od databases and their rules along
/// with host groups for replication. It is used as an in-memory structure
/// that can be loaded incrementally from objet storage.
#[derive(Default, Debug)]
#[derive(Debug)]
pub(crate) struct Config {
jobs: Arc<JobRegistry>,
state: RwLock<ConfigState>,
}
impl Config {
pub(crate) fn new(jobs: Arc<JobRegistry>) -> Self {
Self {
state: Default::default(),
jobs,
}
}
pub(crate) fn create_db(
&self,
name: DatabaseName<'static>,
@ -45,7 +54,13 @@ impl Config {
let read_buffer = ReadBufferDb::new();
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
let db = Arc::new(Db::new(rules, mutable_buffer, read_buffer, wal_buffer));
let db = Arc::new(Db::new(
rules,
mutable_buffer,
read_buffer,
wal_buffer,
Arc::clone(&self.jobs),
));
state.reservations.insert(name.clone());
Ok(CreateDatabaseHandle {
@ -147,13 +162,14 @@ impl<'a> Drop for CreateDatabaseHandle<'a> {
#[cfg(test)]
mod test {
use super::*;
use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi};
use super::*;
#[test]
fn create_db() {
let name = DatabaseName::new("foo").unwrap();
let config = Config::default();
let config = Config::new(Arc::new(JobRegistry::new()));
let rules = DatabaseRules::new();
{

View File

@ -10,21 +10,20 @@ use std::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::info;
pub(crate) use chunk::DBChunk;
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules};
use internal_types::{data::ReplicatedWrite, selection::Selection};
use mutable_buffer::MutableBufferDb;
use parking_lot::Mutex;
use query::{Database, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use crate::buffer::Buffer;
use tracing::info;
use crate::{buffer::Buffer, JobRegistry};
mod chunk;
pub(crate) use chunk::DBChunk;
pub mod pred;
mod streams;
@ -81,31 +80,28 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
const STARTING_SEQUENCE: u64 = 1;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
/// This is the main IOx Database object. It is the root object of any
/// specific InfluxDB IOx instance
pub struct Db {
#[serde(flatten)]
pub rules: DatabaseRules,
#[serde(skip)]
/// The (optional) mutable buffer stores incoming writes. If a
/// database does not have a mutable buffer it can not accept
/// writes (it is a read replica)
pub mutable_buffer: Option<MutableBufferDb>,
#[serde(skip)]
/// The read buffer holds chunk data in an in-memory optimized
/// format.
pub read_buffer: Arc<ReadBufferDb>,
#[serde(skip)]
/// The wal buffer holds replicated writes in an append in-memory
/// buffer. This buffer is used for sending data to subscribers
/// and to persist segments in object storage for recovery.
pub wal_buffer: Option<Mutex<Buffer>>,
#[serde(skip)]
jobs: Arc<JobRegistry>,
sequence: AtomicU64,
}
impl Db {
@ -114,6 +110,7 @@ impl Db {
mutable_buffer: Option<MutableBufferDb>,
read_buffer: ReadBufferDb,
wal_buffer: Option<Buffer>,
jobs: Arc<JobRegistry>,
) -> Self {
let wal_buffer = wal_buffer.map(Mutex::new);
let read_buffer = Arc::new(read_buffer);
@ -122,6 +119,7 @@ impl Db {
mutable_buffer,
read_buffer,
wal_buffer,
jobs,
sequence: AtomicU64::new(STARTING_SEQUENCE),
}
}
@ -382,10 +380,6 @@ impl Database for Db {
#[cfg(test)]
mod tests {
use crate::query_tests::utils::make_db;
use super::*;
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect,
};
@ -398,6 +392,10 @@ mod tests {
};
use test_helpers::assert_contains;
use crate::query_tests::utils::make_db;
use super::*;
#[tokio::test]
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
@ -598,6 +596,7 @@ mod tests {
buffer_size: 300,
..Default::default()
};
let rules = DatabaseRules {
mutable_buffer_config: Some(mbconf.clone()),
..Default::default()
@ -608,6 +607,7 @@ mod tests {
Some(MutableBufferDb::new("foo")),
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
);
let mut writer = TestLPWriter::default();

View File

@ -84,9 +84,8 @@ use data_types::{
job::Job,
{DatabaseName, DatabaseNameError},
};
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
use influxdb_line_protocol::ParsedLine;
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore};
@ -95,7 +94,9 @@ use crate::{
object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME,
},
db::Db,
tracker::{TrackedFutureExt, Tracker, TrackerId, TrackerRegistryWithHistory},
tracker::{
TrackedFutureExt, Tracker, TrackerId, TrackerRegistration, TrackerRegistryWithHistory,
},
};
pub mod buffer;
@ -149,9 +150,34 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
const JOB_HISTORY_SIZE: usize = 1000;
/// The global job registry
#[derive(Debug)]
pub struct JobRegistry {
inner: Mutex<TrackerRegistryWithHistory<Job>>,
}
impl Default for JobRegistry {
fn default() -> Self {
Self {
inner: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)),
}
}
}
impl JobRegistry {
fn new() -> Self {
Default::default()
}
pub fn register(&self, job: Job) -> (Tracker<Job>, TrackerRegistration) {
self.inner.lock().register(job)
}
}
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
/// `Server` is the container struct for how servers store data internally, as
/// well as how they communicate with other servers. Each server will have one
/// of these structs, which keeps track of all replication and query rules.
@ -162,18 +188,20 @@ pub struct Server<M: ConnectionManager> {
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
jobs: Mutex<TrackerRegistryWithHistory<Job>>,
jobs: Arc<JobRegistry>,
}
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
let jobs = Arc::new(JobRegistry::new());
Self {
id: AtomicU32::new(SERVER_ID_NOT_SET),
config: Arc::new(Config::default()),
config: Arc::new(Config::new(Arc::clone(&jobs))),
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
jobs: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)),
jobs,
}
}
@ -354,7 +382,7 @@ impl<M: ConnectionManager> Server<M> {
let writer_id = self.require_id()?;
let store = Arc::clone(&self.store);
let (_, tracker) = self.jobs.lock().register(Job::PersistSegment {
let (_, tracker) = self.jobs.register(Job::PersistSegment {
writer_id,
segment_id: segment.id,
});
@ -390,7 +418,7 @@ impl<M: ConnectionManager> Server<M> {
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> Tracker<Job> {
let (tracker, registration) = self.jobs.lock().register(Job::Dummy {
let (tracker, registration) = self.jobs.register(Job::Dummy {
nanos: nanos.clone(),
});
@ -422,7 +450,7 @@ impl<M: ConnectionManager> Server<M> {
.db(&name)
.context(DatabaseNotFound { db_name: &db_name })?;
let (tracker, registration) = self.jobs.lock().register(Job::CloseChunk {
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
db_name: db_name.clone(),
partition_key: partition_key.clone(),
chunk_id,
@ -467,12 +495,12 @@ impl<M: ConnectionManager> Server<M> {
/// Returns a list of all jobs tracked by this server
pub fn tracked_jobs(&self) -> Vec<Tracker<Job>> {
self.jobs.lock().tracked()
self.jobs.inner.lock().tracked()
}
/// Returns a specific job tracked by this server
pub fn get_job(&self, id: TrackerId) -> Option<Tracker<Job>> {
self.jobs.lock().get(id)
self.jobs.inner.lock().get(id)
}
/// Background worker function
@ -480,7 +508,7 @@ impl<M: ConnectionManager> Server<M> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
while !shutdown.is_cancelled() {
self.jobs.lock().reclaim();
self.jobs.inner.lock().reclaim();
tokio::select! {
_ = interval.tick() => {},
@ -620,6 +648,8 @@ mod tests {
use futures::TryStreamExt;
use parking_lot::Mutex;
use snafu::Snafu;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use data_types::database_rules::{
@ -632,8 +662,6 @@ mod tests {
use crate::buffer::Segment;
use super::*;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = TestError> = std::result::Result<T, E>;

View File

@ -84,6 +84,22 @@ impl DBSetup for TwoMeasurements {
}
}
pub struct TwoMeasurementsUnsignedType {}
#[async_trait]
impl DBSetup for TwoMeasurementsUnsignedType {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"restaurant,town=andover count=40000u 100",
"restaurant,town=reading count=632u 120",
"school,town=reading count=17u 150",
"school,town=andover count=25u 160",
];
make_one_chunk_scenarios(partition_key, &lp_lines.join("\n")).await
}
}
/// Single measurement that has several different chunks with
/// different (but compatible) schema
pub struct MultiChunkSchemaMerge {}

View File

@ -143,6 +143,40 @@ async fn sql_select_with_schema_merge() {
run_sql_test_case!(MultiChunkSchemaMerge {}, "SELECT * from cpu", &expected);
}
#[tokio::test]
async fn sql_select_from_restaurant() {
let expected = vec![
"+---------+-------+",
"| town | count |",
"+---------+-------+",
"| andover | 40000 |",
"| reading | 632 |",
"+---------+-------+",
];
run_sql_test_case!(
TwoMeasurementsUnsignedType {},
"SELECT town, count from restaurant",
&expected
);
}
#[tokio::test]
async fn sql_select_from_school() {
let expected = vec![
"+---------+-------+",
"| town | count |",
"+---------+-------+",
"| reading | 17 |",
"| andover | 25 |",
"+---------+-------+",
];
run_sql_test_case!(
TwoMeasurementsUnsignedType {},
"SELECT town, count from school",
&expected
);
}
#[tokio::test]
async fn sql_select_with_schema_merge_subset() {
let expected = vec![

View File

@ -113,3 +113,21 @@ async fn list_schema_disk_selection() {
run_table_schema_test_case!(TwoMeasurements {}, selection, "disk", expected_schema);
}
#[tokio::test]
async fn list_schema_location_all() {
// we expect columns to come out in lexographic order by name
let expected_schema = SchemaBuilder::new()
.field("count", DataType::UInt64)
.timestamp()
.tag("town")
.build()
.unwrap();
run_table_schema_test_case!(
TwoMeasurementsUnsignedType {},
Selection::All,
"restaurant",
expected_schema
);
}

View File

@ -1,7 +1,8 @@
use data_types::database_rules::DatabaseRules;
use mutable_buffer::MutableBufferDb;
use crate::db::Db;
use crate::{db::Db, JobRegistry};
use std::sync::Arc;
/// Used for testing: create a Database with a local store
pub fn make_db() -> Db {
@ -11,5 +12,6 @@ pub fn make_db() -> Db {
Some(MutableBufferDb::new(name)),
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
)
}

View File

@ -361,7 +361,10 @@ impl TryClone for MemWriter {
#[cfg(test)]
mod tests {
use crate::db::{DBChunk, Db};
use crate::{
db::{DBChunk, Db},
JobRegistry,
};
use read_buffer::Database as ReadBufferDb;
use super::*;
@ -480,6 +483,7 @@ mem,host=A,region=west used=45 1
Some(MutableBufferDb::new(name)),
ReadBufferDb::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
)
}
}

View File

@ -2,7 +2,7 @@ use crate::commands::{
logging::LoggingLevel,
run::{Config, ObjectStore as ObjStoreOpt},
};
use futures::{pin_mut, FutureExt};
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
use object_store::{
self, aws::AmazonS3, azure::MicrosoftAzure, gcp::GoogleCloudStorage, ObjectStore,
@ -140,8 +140,11 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
// Construct a token to trigger shutdown
let token = tokio_util::sync::CancellationToken::new();
// An internal shutdown token for internal workers
let internal_shutdown = tokio_util::sync::CancellationToken::new();
// Construct a token to trigger shutdown of API services
let frontend_shutdown = internal_shutdown.child_token();
// Construct and start up gRPC server
let grpc_bind_addr = config.grpc_bind_address;
@ -149,21 +152,23 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
.await
.context(StartListeningGrpc { grpc_bind_addr })?;
let grpc_server = rpc::serve(socket, Arc::clone(&app_server), token.clone()).fuse();
let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();
info!(bind_address=?grpc_bind_addr, "gRPC server listening");
let bind_addr = config.http_bind_address;
let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
let http_server = http::serve(addr, Arc::clone(&app_server), token.clone()).fuse();
let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();
info!(bind_address=?bind_addr, "HTTP server listening");
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
info!(git_hash, "InfluxDB IOx server ready");
// Get IOx background worker task
let app = app_server.background_worker(token.clone()).fuse();
let background_worker = app_server
.background_worker(internal_shutdown.clone())
.fuse();
// Shutdown signal
let signal = wait_for_signal().fuse();
@ -192,19 +197,29 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
// pin_mut constructs a Pin<&mut T> from a T by preventing moving the T
// from the current stack frame and constructing a Pin<&mut T> to it
pin_mut!(signal);
pin_mut!(app);
pin_mut!(background_worker);
pin_mut!(grpc_server);
pin_mut!(http_server);
// Return the first error encountered
let mut res = Ok(());
// Trigger graceful shutdown of server components on signal
// or any background service exiting
loop {
// Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the
// process, or by a background task exiting - most likely with an error
//
// Graceful shutdown should then proceed in the following order
// 1. Stop accepting new HTTP and gRPC requests and drain existing connections
// 2. Trigger shutdown of internal background workers loops
//
// This is important to ensure background tasks, such as polling the tracker
// registry, don't exit before HTTP and gRPC requests dependent on them
while !grpc_server.is_terminated() && !http_server.is_terminated() {
futures::select! {
_ = signal => info!("Shutdown requested"),
_ = app => info!("Background worker shutdown"),
_ = background_worker => {
info!("background worker shutdown prematurely");
internal_shutdown.cancel();
},
result = grpc_server => match result {
Ok(_) => info!("gRPC server shutdown"),
Err(error) => {
@ -219,13 +234,16 @@ pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
res = res.and(Err(Error::ServingHttp{source: error}))
}
},
complete => break
}
token.cancel()
frontend_shutdown.cancel()
}
info!("InfluxDB IOx server completed shutting down");
info!("frontend shutdown completed");
internal_shutdown.cancel();
background_worker.await;
info!("server completed shutting down");
res
}