feat: add writer_id and object_store in Db

pull/24376/head
Nga Tran 2021-04-07 18:36:07 -04:00
parent d59c5de39a
commit be6e1e48e4
18 changed files with 318 additions and 117 deletions

1
Cargo.lock generated
View File

@ -1504,6 +1504,7 @@ dependencies = [
"generated_types",
"http",
"hyper",
"object_store",
"prost",
"rand 0.8.3",
"serde",

View File

@ -2,6 +2,7 @@
//! including schema, summary statistics, and file locations in storage.
use std::fmt::{Debug, Display};
use std::mem;
use serde::{Deserialize, Serialize};
@ -68,6 +69,14 @@ impl TableSummary {
}
}
pub fn size(&self) -> usize {
let size: usize = self.columns.iter().map(|c| c.size()).sum();
size + self.name.len() + mem::size_of::<Self>()
}
pub fn has_table(&self, table_name: &str) -> bool {
self.name.eq(table_name)
}
/// Updates the table summary with combined stats from the other. Counts are
/// treated as non-overlapping so they're just added together. If the
/// type of a column differs between the two tables, no update is done
@ -106,6 +115,11 @@ impl ColumnSummary {
self.stats.count()
}
/// Return size in bytes of this Column
pub fn size(&self) -> usize {
mem::size_of::<Self>() + self.name.len() + mem::size_of_val(&self.stats)
}
// Updates statistics from other if the same type, otherwise a noop
pub fn update_from(&mut self, other: &Self) {
match (&mut self.stats, &other.stats) {

View File

@ -17,6 +17,7 @@ generated_types = { path = "../generated_types" }
futures-util = { version = "0.3.1", optional = true }
http = "0.2.3"
hyper = "0.14"
object_store = { path = "../object_store" }
prost = "0.7"
serde = "1.0.118"
serde_json = { version = "1.0.44", optional = true }

View File

@ -13,13 +13,15 @@ a database named `telemetry`:
fn main() {
use data_types::database_rules::DatabaseRules;
use influxdb_iox_client::ClientBuilder;
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server_id = NonZeroU32::new(1).unwrap();
let client = ClientBuilder::default()
.build("http://127.0.0.1:8080")
.expect("client should be valid");
client
.create_database("telemetry", &DatabaseRules::default())
.create_database("telemetry", &DatabaseRules::default(), server_id, store)
.await
.expect("failed to create database");
}

View File

@ -1,12 +1,13 @@
use std::num::NonZeroU32;
use thiserror::Error;
use self::generated_types::{management_service_client::ManagementServiceClient, *};
use crate::connection::Connection;
use ::generated_types::google::longrunning::Operation;
//use object_store::ObjectStore;
use std::convert::TryInto;
use std::num::NonZeroU32;
/// Re-export generated_types
pub mod generated_types {
@ -257,8 +258,11 @@ impl Client {
pub async fn create_database(
&mut self,
rules: DatabaseRules,
/* server_id: NonZeroU32,
* object_store: ObjectStore, */
) -> Result<(), CreateDatabaseError> {
self.inner
//.create_database(CreateDatabaseRequest { rules: Some(rules) }, server_id, object_store)
.create_database(CreateDatabaseRequest { rules: Some(rules) })
.await
.map_err(|status| match status.code() {

View File

@ -5,6 +5,8 @@ use data_types::partition_metadata::TableSummary;
use object_store::path::Path;
use tracker::{MemRegistry, MemTracker};
use std::mem;
#[derive(Debug)]
pub struct Chunk {
/// Partition this chunk belongs to
@ -32,27 +34,32 @@ impl Chunk {
chunk
}
/// Add a chunk's table and its summary
pub fn add_table(&mut self, table_summary: TableSummary, file_location: Path) {
self.tables.push(Table::new(table_summary, file_location));
}
/// Return true if this chunk includes the given table
pub fn has_table(&self, table_name: &str) -> bool {
// TODO: check if this table exists in the chunk
if table_name.is_empty() {
return false;
}
true
self.tables.iter().any(|t| t.has_table(table_name))
}
// Return all tables of this chunk
pub fn all_table_names(&self, names: &mut BTreeSet<String>) {
// TODO
names.insert("todo".to_string());
let mut tables = self
.tables
.iter()
.map(|t| t.name())
.collect::<BTreeSet<String>>();
names.append(&mut tables);
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {
// TODO
0
let size: usize = self.tables.iter().map(|t| t.size()).sum();
size + self.partition_key.len() + mem::size_of::<u32>() + mem::size_of::<Self>()
}
}

View File

@ -16,6 +16,7 @@ use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},
num::NonZeroU32,
sync::Arc,
};
@ -52,12 +53,12 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Storage {
object_store: Arc<ObjectStore>,
writer_id: u32,
writer_id: NonZeroU32,
db_name: String,
}
impl Storage {
pub fn new(store: Arc<ObjectStore>, id: u32, db: String) -> Self {
pub fn new(store: Arc<ObjectStore>, id: NonZeroU32, db: String) -> Self {
Self {
object_store: store,
writer_id: id,
@ -77,7 +78,6 @@ impl Storage {
// <writer id>/<database>/data/<partition key>/<chunk id>/<table
// name>.parquet
// let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let mut path = self.object_store.new_path();
path.push_dir(self.writer_id.to_string());
path.push_dir(self.db_name.clone());

View File

@ -1,6 +1,8 @@
use data_types::partition_metadata::TableSummary;
use object_store::path::Path;
use std::mem;
/// Table that belongs to a chunk persisted in a parquet file in object store
#[derive(Debug, Clone)]
pub struct Table {
@ -20,4 +22,21 @@ impl Table {
object_store_path: path,
}
}
pub fn has_table(&self, table_name: &str) -> bool {
self.table_summary.has_table(table_name)
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {
mem::size_of::<Self>()
+ self.table_summary.size()
+ mem::size_of_val(&self.object_store_path)
}
/// Return name of this table
pub fn name(&self) -> String {
self.table_summary.name.clone()
}
}

View File

@ -514,7 +514,7 @@ impl DatabaseStore for TestDatabaseStore {
/// (handles creating sequence numbers and writer ids
#[derive(Debug, Default)]
pub struct TestLPWriter {
writer_id: u32,
pub writer_id: u32,
sequence_number: u64,
}

View File

@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
sync::{Arc, RwLock},
};
@ -7,7 +8,7 @@ use data_types::{
database_rules::{DatabaseRules, WriterId},
DatabaseName,
};
use object_store::path::ObjectStorePath;
use object_store::{path::ObjectStorePath, ObjectStore};
use read_buffer::Database as ReadBufferDb;
/// This module contains code for managing the configuration of the server.
@ -82,7 +83,7 @@ impl Config {
state.remotes.remove(&id)
}
fn commit(&self, rules: DatabaseRules) {
fn commit(&self, rules: DatabaseRules, server_id: NonZeroU32, object_store: Arc<ObjectStore>) {
let mut state = self.state.write().expect("mutex poisoned");
let name = state
.reservations
@ -98,6 +99,8 @@ impl Config {
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
let db = Arc::new(Db::new(
rules,
server_id,
object_store,
read_buffer,
wal_buffer,
Arc::clone(&self.jobs),
@ -221,8 +224,9 @@ pub(crate) struct CreateDatabaseHandle<'a> {
}
impl<'a> CreateDatabaseHandle<'a> {
pub(crate) fn commit(mut self) {
self.config.commit(self.rules.take().unwrap())
pub(crate) fn commit(mut self, server_id: NonZeroU32, object_store: Arc<ObjectStore>) {
self.config
.commit(self.rules.take().unwrap(), server_id, object_store)
}
pub(crate) fn rules(&self) -> &DatabaseRules {
@ -257,7 +261,9 @@ mod test {
}
let db_reservation = config.create_db(rules).unwrap();
db_reservation.commit();
let server_id = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
db_reservation.commit(server_id, store);
assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -281,7 +287,9 @@ mod test {
let rules = DatabaseRules::new(name.clone());
let db_reservation = config.create_db(rules).unwrap();
db_reservation.commit();
let server_id = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
db_reservation.commit(server_id, store);
let token = config
.state

View File

@ -2,9 +2,12 @@
//! instances of the mutable buffer, read buffer, and object store
use std::any::Any;
use std::sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
use std::{
num::NonZeroU32,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
};
use async_trait::async_trait;
@ -23,7 +26,7 @@ use data_types::{
chunk::ChunkSummary, database_rules::DatabaseRules, partition_metadata::PartitionSummary,
};
use internal_types::{data::ReplicatedWrite, selection::Selection};
use object_store::{memory::InMemory, ObjectStore};
use object_store::ObjectStore;
use parquet_file::{chunk::Chunk, storage::Storage};
use query::{Database, DEFAULT_SCHEMA};
use read_buffer::Database as ReadBufferDb;
@ -115,20 +118,6 @@ pub enum Error {
source: parquet_file::storage::Error,
},
// #[snafu(display("Error opening Parquet Writer: {}", source))]
// OpeningParquetWriter {
// source: parquet::errors::ParquetError,
// },
// #[snafu(display("Error writing Parquet to memory: {}", source))]
// WritingParquetToMemory {
// source: parquet::errors::ParquetError,
// },
// #[snafu(display("Error closing Parquet Writer: {}", source))]
// ClosingParquetWriter {
// source: parquet::errors::ParquetError,
// },
#[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))]
UnknownMutableBufferChunk { chunk_id: u32 },
@ -205,6 +194,10 @@ const STARTING_SEQUENCE: u64 = 1;
pub struct Db {
pub rules: RwLock<DatabaseRules>,
pub server_id: NonZeroU32, // this is also the Query Server ID
pub store: Arc<ObjectStore>,
/// The metadata catalog
catalog: Arc<Catalog>,
@ -246,17 +239,23 @@ struct MemoryRegistries {
impl Db {
pub fn new(
rules: DatabaseRules,
server_id: NonZeroU32,
object_store: Arc<ObjectStore>,
read_buffer: ReadBufferDb,
wal_buffer: Option<Buffer>,
jobs: Arc<JobRegistry>,
) -> Self {
let rules = RwLock::new(rules);
let server_id = server_id;
let store = Arc::clone(&object_store);
let wal_buffer = wal_buffer.map(Mutex::new);
let read_buffer = Arc::new(read_buffer);
let catalog = Arc::new(Catalog::new());
let system_tables = Arc::new(SystemSchemaProvider::new(Arc::clone(&catalog)));
Self {
rules,
server_id,
store,
catalog,
read_buffer,
wal_buffer,
@ -488,16 +487,18 @@ impl Db {
self.memory_registries.parquet.as_ref(),
);
// Create a storage to save data of this chunk
// Todo: this must be gotten from server or somewhere
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let storage = Storage::new(store, 100, "db_name".to_string()); // todo: replace with actual writer_id & db_name
let storage = Storage::new(
Arc::clone(&self.store),
self.server_id,
self.rules.read().name.to_string(),
);
for stats in table_stats {
debug!(%partition_key, %chunk_id, table=%stats.name, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordrdBatchStream of data from the read buffer chunk
// Get RecordBatchStream of data from the read buffer chunk
// TODO: When we have the rb_chunk, the following code will be replaced with one
// line let stream = rb_chunk.read_filter()
let read_results = read_buffer
@ -753,6 +754,7 @@ mod tests {
database_rules::{Order, Sort, SortOrder},
partition_metadata::{ColumnSummary, StatValues, Statistics, TableSummary},
};
use object_store::memory::InMemory;
use query::{
exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk,
};
@ -766,10 +768,13 @@ mod tests {
#[tokio::test]
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
let db = make_db();
db.rules.write().lifecycle_rules.immutable = true;
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
db.rules.write().lifecycle_rules.immutable = true;
let res = writer.write_lp_string(&db, "cpu bar=1 10");
assert_contains!(
res.unwrap_err().to_string(),
@ -779,8 +784,11 @@ mod tests {
#[tokio::test]
async fn read_write() {
let db = Arc::new(make_db());
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = Arc::new(make_db(writer_id, store));
writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap();
let batches = run_query(db, "select * from cpu").await;
@ -797,8 +805,12 @@ mod tests {
#[tokio::test]
async fn write_with_rollover() {
let db = Arc::new(make_db());
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = Arc::new(make_db(writer_id, store));
//writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap();
writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap();
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
@ -838,8 +850,10 @@ mod tests {
#[tokio::test]
async fn write_with_missing_tags_are_null() {
let db = Arc::new(make_db());
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = Arc::new(make_db(writer_id, store));
// Note the `region` tag is introduced in the second line, so
// the values in prior rows for the region column are
@ -875,8 +889,11 @@ mod tests {
#[tokio::test]
async fn read_from_read_buffer() {
// Test that data can be loaded into the ReadBuffer
let db = Arc::new(make_db());
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = Arc::new(make_db(writer_id, store));
writer.write_lp_string(db.as_ref(), "cpu bar=1 10").unwrap();
writer.write_lp_string(db.as_ref(), "cpu bar=2 20").unwrap();
@ -923,11 +940,13 @@ mod tests {
#[tokio::test]
async fn write_updates_last_write_at() {
let db = make_db();
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
writer.write_lp_string(&db, "cpu bar=1 10").unwrap();
let after_write = Utc::now();
@ -952,10 +971,11 @@ mod tests {
#[tokio::test]
async fn test_chunk_timestamps() {
let start = Utc::now();
let db = make_db();
// Given data loaded into two chunks
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
writer.write_lp_string(&db, "cpu bar=1 10").unwrap();
let after_data_load = Utc::now();
@ -985,11 +1005,14 @@ mod tests {
#[tokio::test]
async fn test_chunk_closing() {
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
db.rules.write().lifecycle_rules.mutable_size_threshold =
Some(NonZeroUsize::new(2).unwrap());
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, "cpu bar=1 10").unwrap();
writer.write_lp_string(&db, "cpu bar=1 20").unwrap();
@ -1007,8 +1030,11 @@ mod tests {
#[tokio::test]
async fn chunks_sorted_by_times() {
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap(); // 1 is not set so cannot be used here
let db = make_db(writer_id, store);
writer.write_lp_string(&db, "cpu val=1 1").unwrap();
writer
.write_lp_string(&db, "mem val=2 400000000000001")
@ -1045,9 +1071,12 @@ mod tests {
#[tokio::test]
async fn chunk_id_listing() {
// Test that chunk id listing is hooked up
let db = make_db();
let partition_key = "1970-01-01T00";
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
let partition_key = "1970-01-01T00";
writer.write_lp_string(&db, "cpu bar=1 10").unwrap();
writer.write_lp_string(&db, "cpu bar=1 20").unwrap();
@ -1097,8 +1126,10 @@ mod tests {
#[tokio::test]
async fn partition_chunk_summaries() {
// Test that chunk id listing is hooked up
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
writer.write_lp_string(&db, "cpu bar=1 1").unwrap();
db.rollover_partition("1970-01-01T00").await.unwrap();
@ -1142,8 +1173,10 @@ mod tests {
#[tokio::test]
async fn partition_chunk_summaries_timestamp() {
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
let start = Utc::now();
writer.write_lp_string(&db, "cpu bar=1 1").unwrap();
@ -1195,8 +1228,10 @@ mod tests {
#[tokio::test]
async fn chunk_summaries() {
// Test that chunk id listing is hooked up
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
// get three chunks: one open, one closed in mb and one close in rb
writer.write_lp_string(&db, "cpu bar=1 1").unwrap();
@ -1269,8 +1304,10 @@ mod tests {
#[tokio::test]
async fn partition_summaries() {
// Test that chunk id listing is hooked up
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, store);
writer.write_lp_string(&db, "cpu bar=1 1").unwrap();
let chunk_id = db.rollover_partition("1970-01-01T00").await.unwrap().id();

View File

@ -192,6 +192,7 @@ pub struct Server<M: ConnectionManager> {
jobs: Arc<JobRegistry>,
}
//impl<M: ConnectionManager + std::marker::Send + Sync> Server<M> {
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
let jobs = Arc::new(JobRegistry::new());
@ -220,7 +221,12 @@ impl<M: ConnectionManager> Server<M> {
}
/// Tells the server the set of rules for a database.
pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> {
pub async fn create_database(
&self,
rules: DatabaseRules,
server_id: NonZeroU32,
object_store: Arc<ObjectStore>,
) -> Result<()> {
// Return an error if this server hasn't yet been setup with an id
self.require_id()?;
let db_reservation = self.config.create_db(rules)?;
@ -228,7 +234,7 @@ impl<M: ConnectionManager> Server<M> {
self.persist_database_rules(db_reservation.rules().clone())
.await?;
db_reservation.commit();
db_reservation.commit(server_id, object_store);
Ok(())
}
@ -281,6 +287,7 @@ impl<M: ConnectionManager> Server<M> {
.map(|mut path| {
let store = Arc::clone(&self.store);
let config = Arc::clone(&self.config);
let server_id = self.require_id().unwrap(); //todo: return error instead
path.set_file_name(DB_RULES_FILE_NAME);
@ -306,7 +313,7 @@ impl<M: ConnectionManager> Server<M> {
}
Ok(rules) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(),
Ok(handle) => handle.commit(server_id, store),
},
}
})
@ -535,8 +542,12 @@ where
let db = match self.db(&db_name) {
Some(db) => db,
None => {
self.create_database(DatabaseRules::new(db_name.clone()))
.await?;
self.create_database(
DatabaseRules::new(db_name.clone()),
self.require_id()?,
Arc::clone(&self.store),
)
.await?;
self.db(&db_name).expect("db not inserted")
}
};
@ -655,8 +666,7 @@ mod tests {
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server = Server::new(manager, store);
let rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
let resp = server.create_database(rules).await.unwrap_err();
let resp = server.require_id().unwrap_err();
assert!(matches!(resp, Error::IdNotSet));
let lines = parsed_lines("cpu foo=1 10");
@ -685,7 +695,11 @@ mod tests {
// Create a database
server
.create_database(rules.clone())
.create_database(
rules.clone(),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.expect("failed to create database");
@ -710,7 +724,11 @@ mod tests {
let db2 = DatabaseName::new("db_awesome").unwrap();
server
.create_database(DatabaseRules::new(db2.clone()))
.create_database(
DatabaseRules::new(db2.clone()),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.expect("failed to create 2nd db");
@ -738,13 +756,21 @@ mod tests {
// Create a database
server
.create_database(DatabaseRules::new(name.clone()))
.create_database(
DatabaseRules::new(name.clone()),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.expect("failed to create database");
// Then try and create another with the same name
let got = server
.create_database(DatabaseRules::new(name.clone()))
.create_database(
DatabaseRules::new(name.clone()),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap_err();
@ -765,7 +791,11 @@ mod tests {
for name in &names {
let name = DatabaseName::new(name.to_string()).unwrap();
server
.create_database(DatabaseRules::new(name))
.create_database(
DatabaseRules::new(name),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.expect("failed to create database");
}
@ -783,7 +813,11 @@ mod tests {
let name = DatabaseName::new("foo".to_string()).unwrap();
server
.create_database(DatabaseRules::new(name))
.create_database(
DatabaseRules::new(name),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap();
@ -826,7 +860,11 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap();
server
.create_database(DatabaseRules::new(db_name.clone()))
.create_database(
DatabaseRules::new(db_name.clone()),
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap();
@ -896,7 +934,14 @@ mod tests {
lifecycle_rules: Default::default(),
shard_config: None,
};
server.create_database(rules).await.unwrap();
server
.create_database(
rules,
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap();
let lines = parsed_lines("disk,host=a used=10.1 12");
server.write_lines(db_name.as_str(), &lines).await.unwrap();

View File

@ -3,6 +3,7 @@ use crate::query_tests::{scenarios::*, utils::make_db};
use arrow_deps::{arrow::util::pretty::pretty_format_batches, datafusion::prelude::*};
use async_trait::async_trait;
use object_store::{memory::InMemory, ObjectStore};
use query::{
exec::Executor,
frontend::influxrpc::InfluxRPCPlanner,
@ -10,6 +11,7 @@ use query::{
predicate::{Predicate, PredicateBuilder},
test::TestLPWriter,
};
use std::{num::NonZeroU32, sync::Arc};
/// runs read_window_aggregate(predicate) and compares it to the expected
/// output
@ -160,9 +162,11 @@ impl DBSetup for MeasurementForWindowAggregateMonths {
];
// partition keys are: ["2020-03-02T00", "2020-03-01T00", "2020-04-01T00",
// "2020-04-02T00"]
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let db = make_db(writer_id, Arc::clone(&store));
let data = lp_lines.join("\n");
writer.write_lp_string(&db, &data).unwrap();
let scenario1 = DBScenario {
@ -170,8 +174,9 @@ impl DBSetup for MeasurementForWindowAggregateMonths {
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
let data = lp_lines.join("\n");
writer.write_lp_string(&db, &data).unwrap();
db.rollover_partition("2020-03-01T00").await.unwrap();
@ -183,8 +188,9 @@ impl DBSetup for MeasurementForWindowAggregateMonths {
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
let data = lp_lines.join("\n");
writer.write_lp_string(&db, &data).unwrap();
rollover_and_load(&db, "2020-03-01T00").await;

View File

@ -3,6 +3,8 @@
use query::{test::TestLPWriter, PartitionChunk};
use async_trait::async_trait;
use object_store::{memory::InMemory, ObjectStore};
use std::{num::NonZeroU32, sync::Arc};
use crate::db::Db;
@ -26,8 +28,11 @@ pub struct NoData {}
#[async_trait]
impl DBSetup for NoData {
async fn make(&self) -> Vec<DBScenario> {
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
let partition_key = "1970-01-01T00";
let db = make_db();
let scenario1 = DBScenario {
scenario_name: "New, Empty Database".into(),
db,
@ -35,7 +40,7 @@ impl DBSetup for NoData {
// listing partitions (which may create an entry in a map)
// in an empty database
let db = make_db();
let db = make_db(writer_id, Arc::clone(&store));
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
let scenario2 = DBScenario {
@ -44,9 +49,11 @@ impl DBSetup for NoData {
};
// a scenario where the database has had data loaded and then deleted
let db = make_db();
let data = "cpu,region=west user=23.2 100";
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
let data = "cpu,region=west user=23.2 100";
writer.write_lp_string(&db, data).unwrap();
// move data out of open chunk
assert_eq!(db.rollover_partition(partition_key).await.unwrap().id(), 0);
@ -172,8 +179,10 @@ pub struct TwoMeasurementsManyFieldsOneChunk {}
#[async_trait]
impl DBSetup for TwoMeasurementsManyFieldsOneChunk {
async fn make(&self) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let db = make_db(writer_id, Arc::clone(&store));
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
@ -229,8 +238,11 @@ impl DBSetup for EndToEndTest {
let lp_data = lp_lines.join("\n");
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let db = make_db(writer_id, Arc::clone(&store));
let res = writer.write_lp_string(&db, &lp_data);
assert!(res.is_ok(), "Error: {}", res.unwrap_err());
@ -249,15 +261,18 @@ impl DBSetup for EndToEndTest {
/// Data in both read buffer and mutable buffer chunk
/// Data in one only read buffer chunk
pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, data).unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
let db = make_db();
let db = make_db(writer_id, Arc::clone(&store));
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).unwrap();
db.rollover_partition(partition_key).await.unwrap();
@ -266,7 +281,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
db,
};
let db = make_db();
let db = make_db(writer_id, Arc::clone(&store));
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).unwrap();
db.rollover_partition(partition_key).await.unwrap();
@ -292,8 +307,11 @@ pub async fn make_two_chunk_scenarios(
data1: &str,
data2: &str,
) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, data1).unwrap();
writer.write_lp_string(&db, data2).unwrap();
let scenario1 = DBScenario {
@ -302,8 +320,10 @@ pub async fn make_two_chunk_scenarios(
};
// spread across 2 mutable buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, data1).unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).unwrap();
@ -313,8 +333,10 @@ pub async fn make_two_chunk_scenarios(
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, data1).unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
@ -327,8 +349,10 @@ pub async fn make_two_chunk_scenarios(
};
// in 2 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
let writer_id: NonZeroU32 = NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, data1).unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).unwrap();

View File

@ -3,15 +3,18 @@ use data_types::{
database_rules::DatabaseRules,
DatabaseName,
};
use object_store::ObjectStore;
use query::Database;
use crate::{db::Db, JobRegistry};
use std::sync::Arc;
use std::{num::NonZeroU32, sync::Arc};
/// Used for testing: create a Database with a local store
pub fn make_db() -> Db {
pub fn make_db(server_id: NonZeroU32, object_store: Arc<ObjectStore>) -> Db {
Db::new(
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
server_id,
object_store,
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),

View File

@ -11,6 +11,7 @@ use bytes::Bytes;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
use snafu::{ResultExt, Snafu};
//use std::num::NonZeroU32;
use tokio::sync::oneshot;
use uuid::Uuid;
@ -292,11 +293,13 @@ cpu,host=B,region=east user=10.0,system=74.1 1
mem,host=A,region=west used=45 1
"#;
let db = make_db();
let mut writer = TestLPWriter::default();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let writer_id = std::num::NonZeroU32::new(1).unwrap();
let db = make_db(writer_id, Arc::clone(&store));
writer.write_lp_string(&db, &lp).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let (tx, rx) = tokio::sync::oneshot::channel();
let mut metadata_path = store.new_path();
metadata_path.push_dir("meta");
@ -390,9 +393,11 @@ mem,host=A,region=west used=45 1
}
/// Create a Database with a local store
pub fn make_db() -> Db {
pub fn make_db(server_id: std::num::NonZeroU32, object_store: Arc<ObjectStore>) -> Db {
Db::new(
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
server_id,
object_store,
ReadBufferDb::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),

View File

@ -752,9 +752,11 @@ mod tests {
));
test_storage.set_id(NonZeroU32::new(1).unwrap()).unwrap();
test_storage
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.create_database(
DatabaseRules::new(DatabaseName::new("MyOrg_MyBucket").unwrap()),
test_storage.require_id().unwrap(),
Arc::clone(&test_storage.store),
)
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -803,9 +805,11 @@ mod tests {
));
test_storage.set_id(NonZeroU32::new(1).unwrap()).unwrap();
test_storage
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.create_database(
DatabaseRules::new(DatabaseName::new("MyOrg_MyBucket").unwrap()),
test_storage.require_id().unwrap(),
Arc::clone(&test_storage.store),
)
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -938,9 +942,11 @@ mod tests {
));
test_storage.set_id(NonZeroU32::new(1).unwrap()).unwrap();
test_storage
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.create_database(
DatabaseRules::new(DatabaseName::new("MyOrg_MyBucket").unwrap()),
test_storage.require_id().unwrap(),
Arc::clone(&test_storage.store),
)
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -988,9 +994,11 @@ mod tests {
));
test_storage.set_id(NonZeroU32::new(1).unwrap()).unwrap();
test_storage
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.create_database(
DatabaseRules::new(DatabaseName::new("MyOrg_MyBucket").unwrap()),
test_storage.require_id().unwrap(),
Arc::clone(&test_storage.store),
)
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -1040,7 +1048,14 @@ mod tests {
shard_config: None,
};
server.create_database(rules).await.unwrap();
server
.create_database(
rules,
server.require_id().unwrap(),
Arc::clone(&server.store),
)
.await
.unwrap();
let base_url = format!(
"{}/iox/api/v1/databases/{}/wal/meta",

View File

@ -92,7 +92,17 @@ where
.and_then(TryInto::try_into)
.map_err(|e| e.scope("rules"))?;
match self.server.create_database(rules).await {
let server_id = match self.server.require_id().ok() {
Some(id) => id,
None => return Err(NotFound::default().into()),
};
let object_store = Arc::clone(&self.server.store);
match self
.server
.create_database(rules, server_id, object_store)
.await
{
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
Err(Error::DatabaseAlreadyExists { db_name }) => {
return Err(AlreadyExists {