Merge branch 'main' into dom/iox-api-client

pull/24376/head
Andrew Lamb 2021-01-12 13:19:49 -05:00 committed by GitHub
commit 38abe9735f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 292 additions and 60 deletions

4
Cargo.lock generated
View File

@ -3486,9 +3486,9 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "string-interner"
version = "0.12.1"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "969ae753ff8a0c6d71d6e3bd799e565be6a02fd68bbf70c252e1d084c8bd5764"
checksum = "383196d1876517ee6f9f0864d1fc1070331b803335d3c6daaa04bbcccd823c08"
dependencies = [
"cfg-if 1.0.0",
"hashbrown",

View File

@ -17,7 +17,7 @@ async-trait = "0.1"
chrono = "0.4"
flatbuffers = "0.6.1"
snafu = "0.6.2"
string-interner = "0.12.0"
string-interner = "0.12.2"
tokio = { version = "0.2", features = ["full"] }
tracing = "0.1"

View File

@ -52,7 +52,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Chunk {
/// The id for this chunk
pub id: u32,

View File

@ -26,9 +26,9 @@ pub enum Error {
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
/// Stores the actual data for columns in a chunk along with summary
/// statistics
#[derive(Debug, Clone)]
pub enum Column {
F64(Vec<Option<f64>>, Statistics<f64>),
I64(Vec<Option<i64>>, Statistics<i64>),

View File

@ -85,6 +85,12 @@ pub enum Error {
source: DataFusionError,
},
#[snafu(display("Error dropping chunk from partition '{}': {}", partition_key, source))]
DroppingChunk {
partition_key: String,
source: crate::partition::Error,
},
#[snafu(display("replicated write from writer {} missing payload", writer))]
MissingPayload { writer: u32 },
}
@ -170,6 +176,37 @@ impl MutableBufferDb {
let mut partition = partition.write().await;
Ok(partition.rollover_chunk())
}
/// Return the list of chunks, in order of id, for the specified
/// partition_key
pub async fn chunks(&self, partition_key: &str) -> Vec<Arc<Chunk>> {
self.get_partition(partition_key)
.await
.read()
.await
.chunks()
}
/// return the specified chunk from the partition
/// Returns None if no such chunk exists.
pub async fn get_chunk(&self, partition_key: &str, chunk_id: u32) -> Option<Arc<Chunk>> {
self.get_partition(partition_key)
.await
.read()
.await
.get_chunk(chunk_id)
.ok()
}
/// drop the the specified chunk from the partition
pub async fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<Arc<Chunk>> {
self.get_partition(partition_key)
.await
.write()
.await
.drop_chunk(chunk_id)
.context(DroppingChunk { partition_key })
}
}
#[async_trait]

View File

@ -16,7 +16,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Dictionary(
StringInterner<DefaultSymbol, StringBackend<DefaultSymbol>, DefaultHashBuilder>,
);

View File

@ -11,7 +11,7 @@ use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Error writing to open chunk of partition with key '{}': {}",
"Error writing to open chunk of partition with key '{}' in mutable buffer: {}",
partition_key,
source
))]
@ -21,7 +21,7 @@ pub enum Error {
},
#[snafu(display(
"Can not drop open chunk '{}' of partition with key '{}'",
"Can not drop open chunk '{}' of partition with key '{}' in mutable buffer",
chunk_id,
partition_key,
))]
@ -31,7 +31,17 @@ pub enum Error {
},
#[snafu(display(
"Can not drop unknown chunk '{}' of partition with key '{}'. Valid chunk ids: {:?}",
"Unknown chunk '{}' of partition with key '{}' in mutable buffer",
chunk_id,
partition_key,
))]
UnknownChunk {
partition_key: String,
chunk_id: u32,
},
#[snafu(display(
"Can not drop unknown chunk '{}' of partition with key '{}' in mutable buffer. Valid chunk ids: {:?}",
chunk_id,
partition_key,
valid_chunk_ids,
@ -113,14 +123,46 @@ impl Partition {
Ok(())
}
/// Return information about the chunks held in this partition
#[allow(dead_code)]
pub fn chunk_info(&self) -> PartitionChunkInfo {
PartitionChunkInfo {
num_closed_chunks: self.closed_chunks.len(),
/// Return the list of chunks, in order of id, in this
/// partition). A Snapshot of the currently active chunk is
/// returned. The snapshot will not be affected by future inserts
pub fn chunks(&self) -> Vec<Arc<Chunk>> {
let mut chunks: Vec<_> = self
.closed_chunks
.iter()
.map(|(_, chunk)| chunk.clone())
.collect::<Vec<_>>();
chunks.push(self.open_chunk_snapshot());
chunks
}
/// return the chunk by id. If the requested chunk is still open,
/// returns a snapshot of that chunk which will not be affected by
/// subsequent writes.
pub fn get_chunk(&self, chunk_id: u32) -> Result<Arc<Chunk>> {
if let Some(chunk) = self.closed_chunks.get(&chunk_id) {
Ok(chunk.clone())
} else if chunk_id == self.open_chunk.id {
Ok(self.open_chunk_snapshot())
} else {
UnknownChunk {
partition_key: &self.key,
chunk_id,
}
.fail()
}
}
/// Get a snapshot of the currently open chunk (that can be queried)
fn open_chunk_snapshot(&self) -> Arc<Chunk> {
// TODO the performance if cloning the chunk is terrible
// Proper performance is tracked in
// https://github.com/influxdata/influxdb_iox/issues/635
let open_chunk_snapshot = self.open_chunk.clone();
Arc::new(open_chunk_snapshot)
}
/// Close the currently open chunk and create a new open
/// chunk. The newly closed chunk is adding to the list of closed
/// chunks if it had data, and is returned.
@ -145,7 +187,6 @@ impl Partition {
/// Drop the specified chunk for the partition, returning a reference to the
/// chunk
#[allow(dead_code)]
pub fn drop_chunk(&mut self, chunk_id: u32) -> Result<Arc<Chunk>> {
self.closed_chunks.remove(&chunk_id).ok_or_else(|| {
let partition_key = self.key.clone();
@ -256,36 +297,21 @@ mod tests {
"| Boston | MA | 71.4 | 200 |",
"+--------+-------+------+------+",
];
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 0
}
);
assert_eq!(partition.closed_chunks.len(), 0);
assert_table_eq!(expected, &dump_table(&partition, "h2o"));
println!("rolling over chunk");
// now rollover chunk, and expected results should be the same
let chunk = partition.rollover_chunk();
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 1
}
);
assert_eq!(partition.closed_chunks.len(), 1);
assert_table_eq!(expected, &dump_table(&partition, "h2o"));
assert_eq!(row_count("h2o", &chunk), 2);
// calling rollover chunk again is ok; It is returned but not added to the
// closed chunk list
let chunk = partition.rollover_chunk();
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 1
}
);
assert_eq!(partition.closed_chunks.len(), 1);
assert_table_eq!(expected, &dump_table(&partition, "h2o"));
assert_eq!(row_count("h2o", &chunk), 0);
}
@ -305,12 +331,7 @@ mod tests {
// now rollover chunk
let chunk = partition.rollover_chunk();
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 1
}
);
assert_eq!(partition.closed_chunks.len(), 1);
assert_eq!(row_count("h2o", &chunk), 2);
load_data(
@ -339,12 +360,7 @@ mod tests {
// now rollover chunk again
let chunk = partition.rollover_chunk();
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 2
}
);
assert_eq!(partition.closed_chunks.len(), 2);
assert_eq!(row_count("h2o", &chunk), 3);
assert_table_eq!(expected, &dump_table(&partition, "h2o"));
}
@ -378,24 +394,14 @@ mod tests {
"| Boston | MA | 72.4 | 200 |",
"+--------+-------+------+------+",
];
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 0
}
);
assert_eq!(partition.closed_chunks.len(), 0);
assert_table_eq!(expected_h2o, &dump_table(&partition, "h2o"));
assert_table_eq!(expected_o2, &dump_table(&partition, "o2"));
// now rollover chunk again
let chunk = partition.rollover_chunk();
assert_eq!(
partition.chunk_info(),
PartitionChunkInfo {
num_closed_chunks: 1
}
);
assert_eq!(partition.closed_chunks.len(), 1);
assert_eq!(row_count("h2o", &chunk), 1);
assert_eq!(row_count("o2", &chunk), 2);
@ -524,7 +530,7 @@ mod tests {
let mut partition = Partition::new("a_key");
let e = partition.drop_chunk(0).unwrap_err();
assert_eq!(
"Can not drop open chunk '0' of partition with key 'a_key'",
"Can not drop open chunk '0' of partition with key 'a_key' in mutable buffer",
format!("{}", e)
);
@ -534,7 +540,7 @@ mod tests {
// can't drop again
let e = partition.drop_chunk(0).unwrap_err();
assert_eq!(
"Can not drop unknown chunk '0' of partition with key 'a_key'. Valid chunk ids: [1]",
"Can not drop unknown chunk '0' of partition with key 'a_key' in mutable buffer. Valid chunk ids: [1]",
format!("{}", e)
);
}
@ -621,6 +627,177 @@ mod tests {
assert!(chunk.time_closed.unwrap() < after_rollover);
}
#[tokio::test]
async fn test_list_chunks() {
// test Create Read Update and Delete for chunks
let mut partition = Partition::new("a_key");
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=70.4 100"]).await;
assert_eq!(chunk_ids(&partition), vec![0]);
// roll the chunk over to make a new one
let chunk = partition.rollover_chunk();
assert_eq!(chunk.id(), 0);
assert_eq!(chunk_ids(&partition), vec![0, 1]);
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=70.4 200"]).await;
let chunk = partition.rollover_chunk();
assert_eq!(chunk.id(), 1);
assert_eq!(chunk_ids(&partition), vec![0, 1, 2]);
// now delete chunk 1
partition.drop_chunk(1).unwrap();
assert_eq!(chunk_ids(&partition), vec![0, 2]);
// now delete chunk 0
partition.drop_chunk(0).unwrap();
assert_eq!(chunk_ids(&partition), vec![2]);
}
#[tokio::test]
async fn test_get_chunks() {
// test Create Read Update and Delete for chunks
let mut partition = Partition::new("a_key");
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=70.4 100"]).await;
let expected0 = &[
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 70.4 | 100 |",
"+--------+-------+------+------+",
];
assert_table_eq!(
expected0,
&dump_chunk_table(&partition.get_chunk(0).unwrap(), "h2o")
);
let res = partition.get_chunk(1);
assert_eq!(
res.unwrap_err().to_string(),
"Unknown chunk '1' of partition with key 'a_key' in mutable buffer"
);
let chunk = partition.rollover_chunk();
assert_table_eq!(expected0, &dump_chunk_table(&chunk, "h2o"));
assert_table_eq!(
expected0,
&dump_chunk_table(&partition.get_chunk(0).unwrap(), "h2o")
);
assert_eq!(
dump_chunk_table(&partition.get_chunk(1).unwrap(), "h2o").len(),
0
); // no records in chunk 1
// load data into chunk1 and ensure get_chunk still returns the parts
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=72.4 200"]).await;
let expected1 = &[
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 72.4 | 200 |",
"+--------+-------+------+------+",
];
assert_table_eq!(
expected0,
&dump_chunk_table(&partition.get_chunk(0).unwrap(), "h2o")
);
assert_table_eq!(
expected1,
&dump_chunk_table(&partition.get_chunk(1).unwrap(), "h2o")
);
}
#[tokio::test]
async fn test_drop_chunk_error() {
// test Create Read Update and Delete for chunks
let mut partition = Partition::new("a_key");
// can't drop non existent patition
let res = partition.drop_chunk(43);
assert_eq!(res.unwrap_err().to_string(), "Can not drop unknown chunk '43' of partition with key 'a_key' in mutable buffer. Valid chunk ids: [0]");
// can't drop open partition
let res = partition.drop_chunk(0);
assert_eq!(
res.unwrap_err().to_string(),
"Can not drop open chunk '0' of partition with key 'a_key' in mutable buffer"
);
// can't drop same partition twice
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=70.4 100"]).await;
partition.rollover_chunk();
let res = partition.drop_chunk(0);
assert!(res.is_ok(), ":{:?}", res);
let res = partition.drop_chunk(0);
assert_eq!(res.unwrap_err().to_string(), "Can not drop unknown chunk '0' of partition with key 'a_key' in mutable buffer. Valid chunk ids: [1]");
}
#[tokio::test]
async fn test_chunk_snapshot() {
let mut partition = Partition::new("a_key");
// load data in
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=71.4 100"]).await;
let expected0 = &[
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 71.4 | 100 |",
"+--------+-------+------+------+",
];
let chunk0_snapshot0 = partition.get_chunk(0).unwrap();
assert_table_eq!(expected0, &dump_chunk_table(&chunk0_snapshot0, "h2o"));
// load a second row in
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=72.4 200"]).await;
let expected1 = &[
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 71.4 | 100 |",
"| Boston | MA | 72.4 | 200 |",
"+--------+-------+------+------+",
];
let chunk0_snapshot1 = partition.get_chunk(0).unwrap();
// old data is not changed
assert_table_eq!(expected0, &dump_chunk_table(&chunk0_snapshot0, "h2o"));
assert_table_eq!(expected1, &dump_chunk_table(&chunk0_snapshot1, "h2o"));
// load a third row in
load_data(&mut partition, &["h2o,state=MA,city=Boston temp=73.4 300"]).await;
let expected2 = &[
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 71.4 | 100 |",
"| Boston | MA | 72.4 | 200 |",
"| Boston | MA | 73.4 | 300 |",
"+--------+-------+------+------+",
];
let chunk0_snapshot2 = partition.get_chunk(0).unwrap();
// old data is not changed
assert_table_eq!(expected0, &dump_chunk_table(&chunk0_snapshot0, "h2o"));
assert_table_eq!(expected1, &dump_chunk_table(&chunk0_snapshot1, "h2o"));
assert_table_eq!(expected2, &dump_chunk_table(&chunk0_snapshot2, "h2o"));
// even after rollover the snapshots produce the same results:
let chunk0_rollover = partition.rollover_chunk();
// old data remains unchanged
assert_table_eq!(expected0, &dump_chunk_table(&chunk0_snapshot0, "h2o"));
assert_table_eq!(expected1, &dump_chunk_table(&chunk0_snapshot1, "h2o"));
assert_table_eq!(expected2, &dump_chunk_table(&chunk0_snapshot2, "h2o"));
assert_table_eq!(expected2, &dump_chunk_table(&chunk0_rollover, "h2o"));
}
fn row_count(table_name: &str, chunk: &Chunk) -> u32 {
let stats = chunk.table_stats().unwrap();
for s in &stats {
@ -662,6 +839,15 @@ mod tests {
dst.into_iter().map(sort_record_batch).collect()
}
fn dump_chunk_table(chunk: &Chunk, table_name: &str) -> Vec<RecordBatch> {
let requested_columns = []; // empty ==> request all columns
let mut dst = vec![];
chunk
.table_to_arrow(&mut dst, table_name, &requested_columns)
.unwrap();
dst.into_iter().map(sort_record_batch).collect()
}
/// returns a list of all chunk ids in partition that are not empty
fn all_ids_with_data(partition: &Partition) -> Vec<u32> {
partition
@ -669,4 +855,13 @@ mod tests {
.filter_map(|c| if c.is_empty() { None } else { Some(c.id()) })
.collect()
}
/// Lists all chunk ids the partition by calling `chunks()`
fn chunk_ids(partition: &Partition) -> Vec<u32> {
partition
.chunks()
.iter()
.map(|c| c.id())
.collect::<Vec<_>>()
}
}

View File

@ -136,7 +136,7 @@ pub enum Error {
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Table {
/// Name of the table as a u32 in the chunk dictionary
pub id: u32,