refactor: rename write_buffer --> mutable_buffer (#595)

* refactor: git mv write_buffer mutable_buffer

* refactor: update crate name references

* refactor: update some more references
pull/24376/head
Andrew Lamb 2020-12-22 10:49:53 -05:00 committed by GitHub
parent 163f34c27b
commit 48c43b136c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 79 additions and 80 deletions

48
Cargo.lock generated
View File

@ -1362,6 +1362,7 @@ dependencies = [
"influxdb_tsm",
"ingest",
"mem_qe",
"mutable_buffer",
"object_store",
"opentelemetry",
"opentelemetry-jaeger",
@ -1387,7 +1388,6 @@ dependencies = [
"tracing-opentelemetry",
"tracing-subscriber",
"wal",
"write_buffer",
]
[[package]]
@ -1794,6 +1794,28 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333"
[[package]]
name = "mutable_buffer"
version = "0.1.0"
dependencies = [
"arrow_deps",
"async-trait",
"chrono",
"criterion",
"data_types",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"query",
"snafu",
"sqlparser",
"string-interner",
"test_helpers",
"tokio",
"tracing",
"wal",
]
[[package]]
name = "native-tls"
version = "0.2.6"
@ -2901,6 +2923,7 @@ dependencies = [
"futures",
"generated_types",
"influxdb_line_protocol",
"mutable_buffer",
"object_store",
"query",
"serde",
@ -2909,7 +2932,6 @@ dependencies = [
"tokio",
"tracing",
"uuid",
"write_buffer",
]
[[package]]
@ -4007,28 +4029,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "write_buffer"
version = "0.1.0"
dependencies = [
"arrow_deps",
"async-trait",
"chrono",
"criterion",
"data_types",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"query",
"snafu",
"sqlparser",
"string-interner",
"test_helpers",
"tokio",
"tracing",
"wal",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

View File

@ -21,7 +21,7 @@ members = [
"influxdb_tsm",
"query",
"wal",
"write_buffer",
"mutable_buffer",
"influxdb2_client",
]
@ -40,7 +40,7 @@ influxdb_line_protocol = { path = "influxdb_line_protocol" }
mem_qe = { path = "mem_qe" }
segment_store = { path = "segment_store" }
packers = { path = "packers" }
write_buffer = { path = "write_buffer" }
mutable_buffer = { path = "mutable_buffer" }
object_store = { path = "object_store" }
query = { path = "query" }
influxdb_tsm = { path = "influxdb_tsm" }

View File

@ -1,5 +1,5 @@
[package]
name = "write_buffer"
name = "mutable_buffer"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"

View File

@ -1,8 +1,8 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use influxdb_line_protocol as line_parser;
use mutable_buffer::{restore_partitions_from_wal, MutableBufferDb};
use query::TSDatabase;
use wal::{Entry, WalBuilder};
use write_buffer::{restore_partitions_from_wal, Db};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -73,7 +73,7 @@ async fn common_create_entries(
) -> Result<(Vec<Entry>, usize)> {
let tmp_dir = test_helpers::tmp_dir()?;
let mut wal_dir = tmp_dir.as_ref().to_owned();
let db = Db::try_with_wal("mydb", &mut wal_dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut wal_dir).await?;
let mut lp_entries = Vec::new();
f(&mut |entry| lp_entries.push(entry));

View File

@ -239,14 +239,14 @@ impl From<crate::partition::Error> for Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default)]
pub struct Db {
pub struct MutableBufferDb {
pub name: String,
// TODO: partitions need to be wrapped in an Arc if they're going to be used without this lock
partitions: RwLock<Vec<Partition>>,
wal_details: Option<WalDetails>,
}
impl Db {
impl MutableBufferDb {
/// New creates a new in-memory only write buffer database
pub fn new(name: impl Into<String>) -> Self {
Self {
@ -377,7 +377,7 @@ impl Db {
}
#[async_trait]
impl TSDatabase for Db {
impl TSDatabase for MutableBufferDb {
type Partition = crate::partition::Partition;
type Error = Error;
@ -537,7 +537,7 @@ impl TSDatabase for Db {
}
#[async_trait]
impl SQLDatabase for Db {
impl SQLDatabase for MutableBufferDb {
type Partition = Partition;
type Error = Error;
@ -716,7 +716,7 @@ trait Visitor {
}
}
impl Db {
impl MutableBufferDb {
/// returns the number of partitions in this database
pub async fn len(&self) -> usize {
self.partitions.read().await.len()
@ -1270,7 +1270,7 @@ mod tests {
}
// query the table names, with optional range predicate
async fn table_names(db: &Db, predicate: Predicate) -> Result<StringSet> {
async fn table_names(db: &MutableBufferDb, predicate: Predicate) -> Result<StringSet> {
let plan = db.table_names(predicate).await?;
let executor = Executor::default();
let s = executor.to_string_set(plan).await?;
@ -1284,7 +1284,7 @@ mod tests {
async fn list_table_names() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?;
// no tables initially
assert_eq!(
@ -1312,7 +1312,7 @@ mod tests {
async fn list_table_names_timestamps() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?;
// write two different tables at the following times:
// cpu: 100 and 150
@ -1348,7 +1348,7 @@ mod tests {
async fn missing_tags_are_null() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("mydb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?;
// Note the `region` tag is introduced in the second line, so
// the values in prior rows for the region column are
@ -1438,7 +1438,7 @@ mod tests {
let disk_columns = &["region", "host", "bytes", "used_percent", "time"];
{
let db = Db::try_with_wal("mydb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect();
db.write_lines(&lines).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")
@ -1466,7 +1466,7 @@ mod tests {
// check that it recovers from the wal
{
let db = Db::restore_from_wal(&dir).await?;
let db = MutableBufferDb::restore_from_wal(&dir).await?;
let partitions = db.table_to_arrow("cpu", cpu_columns).await?;
assert_table_eq(expected_cpu_table, &partitions);
@ -1483,7 +1483,7 @@ mod tests {
#[tokio::test]
async fn write_and_query() -> Result {
let db = Db::new("foo");
let db = MutableBufferDb::new("foo");
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i 10")
.map(|l| l.unwrap())
@ -1544,7 +1544,7 @@ mod tests {
let mem_columns = &["region", "host", "val", "time"];
let disk_columns = &["region", "host", "bytes", "used_percent", "time"];
{
let db = Db::try_with_wal("mydb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("mydb", &mut dir).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=A user=23.2,other=1i,str=\"some string\",b=true 10\ndisk,region=west,host=A bytes=23432323i,used_percent=76.2 10").map(|l| l.unwrap()).collect();
db.write_lines(&lines).await?;
let lines: Vec<_> = parse_lines("cpu,region=west,host=B user=23.1 15")
@ -1585,7 +1585,7 @@ mod tests {
let (partitions, _stats) = restore_partitions_from_wal(wal_entries)?;
let db = Db {
let db = MutableBufferDb {
name,
partitions: RwLock::new(partitions),
wal_details: None,
@ -1640,7 +1640,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn list_column_names() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\
@ -1768,7 +1768,7 @@ disk bytes=23432323i 1600136510000000000",
// Demonstration test to show column names with predicate working
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA,county=LA temp=70.4 100\n\
h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250\n\
@ -1802,7 +1802,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn list_column_values() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA temp=70.4 100\n\
h2o,state=MA,city=Boston temp=72.4 250\n\
@ -1958,7 +1958,7 @@ disk bytes=23432323i 1600136510000000000",
// correctly. There are more detailed tests in table.rs that
// test the generated queries.
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let mut lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100", // to row 2
@ -2035,7 +2035,7 @@ disk bytes=23432323i 1600136510000000000",
async fn test_query_series_filter() -> Result {
// check the appropriate filters are applied in the datafusion plans
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
@ -2085,7 +2085,7 @@ disk bytes=23432323i 1600136510000000000",
#[tokio::test]
async fn test_query_series_pred_refers_to_column_not_in_table() -> Result {
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
@ -2145,7 +2145,9 @@ disk bytes=23432323i 1600136510000000000",
)]
async fn test_query_series_pred_neq() {
let mut dir = test_helpers::tmp_dir().unwrap().into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await.unwrap();
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir)
.await
.unwrap();
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
@ -2170,7 +2172,7 @@ disk bytes=23432323i 1600136510000000000",
// Ensure that the database queries are hooked up correctly
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
@ -2266,7 +2268,7 @@ disk bytes=23432323i 1600136510000000000",
async fn test_field_columns_timestamp_predicate() -> Result {
// check the appropriate filters are applied in the datafusion plans
let mut dir = test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let db = MutableBufferDb::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = vec![
"h2o,state=MA,city=Boston temp=70.4 50",

View File

@ -17,6 +17,6 @@ mod table;
// Allow restore partitions to be used outside of this crate (for
// benchmarking)
pub use crate::database::Db;
pub use crate::database::MutableBufferDb;
pub use crate::partition::restore_partitions_from_wal;
pub use crate::store::WriteBufferDatabases;
pub use crate::store::MutableBufferDatabases;

View File

@ -461,7 +461,7 @@ impl ExpressionVisitor for SupportVisitor {
}
}
_ => panic!(
"Unsupported expression in write_buffer database: {:?}",
"Unsupported expression in mutable_buffer database: {:?}",
expr
),
}

View File

@ -7,7 +7,7 @@ use std::{fs, sync::Arc};
use std::{collections::BTreeMap, path::PathBuf};
use crate::database::Db;
use crate::database::MutableBufferDb;
#[derive(Debug, Snafu)]
pub enum Error {
@ -27,12 +27,12 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct WriteBufferDatabases {
databases: RwLock<BTreeMap<String, Arc<Db>>>,
pub struct MutableBufferDatabases {
databases: RwLock<BTreeMap<String, Arc<MutableBufferDb>>>,
base_dir: PathBuf,
}
impl WriteBufferDatabases {
impl MutableBufferDatabases {
pub fn new(base_dir: impl Into<PathBuf>) -> Self {
Self {
databases: RwLock::new(BTreeMap::new()),
@ -70,15 +70,15 @@ impl WriteBufferDatabases {
Ok(dirs)
}
pub async fn add_db(&self, db: Db) {
pub async fn add_db(&self, db: MutableBufferDb) {
let mut databases = self.databases.write().await;
databases.insert(db.name.clone(), Arc::new(db));
}
}
#[async_trait]
impl DatabaseStore for WriteBufferDatabases {
type Database = Db;
impl DatabaseStore for MutableBufferDatabases {
type Database = MutableBufferDb;
type Error = Error;
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
@ -106,7 +106,7 @@ impl DatabaseStore for WriteBufferDatabases {
return Ok(db.clone());
}
let db = Db::try_with_wal(name, &mut self.base_dir.clone())
let db = MutableBufferDb::try_with_wal(name, &mut self.base_dir.clone())
.await
.context(DatabaseError)?;
let db = Arc::new(db);

View File

@ -180,7 +180,7 @@ pub trait DatabaseStore: Debug + Send + Sync {
// but when I do so then other modules can not find them. For example:
//
// error[E0433]: failed to resolve: could not find `test` in `storage`
// --> src/server/write_buffer_routes.rs:353:19
// --> src/server/mutable_buffer_routes.rs:353:19
// |
// 353 | use query::test::TestDatabaseStore;
// | ^^^^ could not find `test` in `query`

View File

@ -44,7 +44,7 @@ impl Store {
}
/// This method adds a partition to the segment store. It is probably what
/// the `WriteBuffer` will call.
/// the `MutableBuffer` will call.
///
/// The partition should comprise a single table (record batch) for each
/// measurement name in the partition.

View File

@ -15,7 +15,7 @@ data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
query = { path = "../query" }
write_buffer = { path = "../write_buffer" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = { path = "../object_store" }
tracing = "0.1"
tokio = { version = "0.2", features = ["full"] }

View File

@ -16,9 +16,9 @@ use data_types::{
{DatabaseName, DatabaseNameError},
};
use influxdb_line_protocol::ParsedLine;
use mutable_buffer::MutableBufferDb;
use object_store::ObjectStore;
use query::{DatabaseStore, SQLDatabase, TSDatabase};
use write_buffer::Db as WriteBufferDb;
use async_trait::async_trait;
use bytes::Bytes;
@ -123,7 +123,7 @@ impl<M: ConnectionManager> Server<M> {
let db_name = DatabaseName::new(db_name.into()).context(InvalidDatabaseName)?;
let buffer = if rules.store_locally {
Some(Arc::new(WriteBufferDb::new(db_name.to_string())))
Some(Arc::new(MutableBufferDb::new(db_name.to_string())))
} else {
None
};
@ -319,7 +319,7 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
pub async fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<WriteBufferDb>> {
pub async fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<MutableBufferDb>> {
let config = self.config.read().await;
config
.databases
@ -330,7 +330,7 @@ impl<M: ConnectionManager> Server<M> {
#[async_trait]
impl DatabaseStore for Server<ConnectionManagerImpl> {
type Database = WriteBufferDb;
type Database = MutableBufferDb;
type Error = Error;
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
@ -395,7 +395,7 @@ pub struct Db {
#[serde(flatten)]
pub rules: DatabaseRules,
#[serde(skip)]
pub local_store: Option<Arc<WriteBufferDb>>,
pub local_store: Option<Arc<MutableBufferDb>>,
#[serde(skip)]
wal_buffer: Option<Buffer>,
#[serde(skip)]

View File

@ -325,8 +325,8 @@ mod tests {
use data_types::database_rules::DatabaseRules;
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use mutable_buffer::partition::Partition as PartitionWB;
use object_store::InMemory;
use write_buffer::partition::Partition as PartitionWB;
#[tokio::test]
async fn snapshot() {

View File

@ -25,13 +25,13 @@ pub enum Error {
},
#[snafu(display("Unable to initialize database in directory {:?}: {}", db_dir, source))]
InitializingWriteBuffer {
InitializingMutableBuffer {
db_dir: PathBuf,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Unable to restore WAL from directory {:?}: {}", dir, source))]
RestoringWriteBuffer {
RestoringMutableBuffer {
dir: PathBuf,
source: Box<dyn std::error::Error + Send + Sync>,
},

View File

@ -1,8 +1,5 @@
//! This module contains a parallel implementation of the /v2 HTTP api
//! routes for InfluxDB IOx based on the WriteBuffer storage implementation.
//!
//! The goal is that eventually the implementation in these routes
//! will replace the implementation in http_routes.rs
//! This module contains a partial implementation of the /v2 HTTP api
//! routes for InfluxDB IOx.
//!
//! Note that these routes are designed to be just helpers for now,
//! and "close enough" to the real /v2 api to be able to test InfluxDB IOx
@ -10,7 +7,7 @@
//! id (this is done by other services in the influx cloud)
//!
//! Long term, we expect to create IOx specific api in terms of
//! database names and may remove this quasi /v2 API from the Deloren.
//! database names and may remove this quasi /v2 API.
use http::header::CONTENT_ENCODING;
use tracing::{debug, error, info};