feat: gRPC measurement_names support for read buffer (#697)

* test: Add tests for reading data from read buffer in different scenarios

* feat: gRPC measurement_names support for read buffer
pull/24376/head
Andrew Lamb 2021-01-26 13:15:56 -05:00 committed by GitHub
parent da4f8f30bf
commit df1367bcd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 16 deletions

View File

@ -737,20 +737,56 @@ mod test_influxrpc {
#[async_trait] #[async_trait]
impl DBSetup for TwoMeasurements { impl DBSetup for TwoMeasurements {
async fn make(&self) -> Vec<DBScenario> { async fn make(&self) -> Vec<DBScenario> {
let db = make_db(); let partition_key = "1970-01-01T00";
let data = "cpu,region=west user=23.2 100\n\ let data = "cpu,region=west user=23.2 100\n\
cpu,region=west user=21.0 150\n\ cpu,region=west user=21.0 150\n\
disk,region=east bytes=99i 200"; disk,region=east bytes=99i 200";
let db = make_db();
let mut writer = TestLPWriter::default(); let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap(); writer.write_lp_string(&db, data).await.unwrap();
vec![ let scenario1 = DBScenario {
DBScenario { scenario_name: "Data in open chunk of mutable buffer".into(),
scenario_name: "Data in open chunk of mutable buffer".into(), db,
db, };
}, // todo add a scenario where the database has had data loaded and then deleted
] let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in closed chunk of mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in both read buffer and mutable buffer".into(),
db,
};
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in only buffer and not mutable buffer".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
} }
} }

View File

@ -22,8 +22,11 @@ pub enum Error {
source: mutable_buffer::chunk::Error, source: mutable_buffer::chunk::Error,
}, },
#[snafu(display("Read Buffer Chunk Error: {}", source))] #[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, source))]
ReadBufferChunk { source: read_buffer::Error }, ReadBufferChunk {
source: read_buffer::Error,
chunk_id: u32,
},
#[snafu(display("Internal Predicate Conversion Error: {}", source))] #[snafu(display("Internal Predicate Conversion Error: {}", source))]
InternalPredicateConversion { source: super::pred::Error }, InternalPredicateConversion { source: super::pred::Error },
@ -113,9 +116,10 @@ impl PartitionChunk for DBChunk {
partition_key, partition_key,
chunk_id, chunk_id,
} => { } => {
let chunk_id = *chunk_id;
// Translate the predicates to ReadBuffer style // Translate the predicates to ReadBuffer style
let predicate = PredicateBuilder::default().build(); let predicate = PredicateBuilder::default().build();
let predicate = let rb_predicate =
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
// translate column selection // translate column selection
@ -131,11 +135,11 @@ impl PartitionChunk for DBChunk {
.read_filter( .read_filter(
partition_key, partition_key,
table_name, table_name,
&[*chunk_id], &[chunk_id],
predicate, rb_predicate,
column_selection, column_selection,
) )
.context(ReadBufferChunk)?; .context(ReadBufferChunk { chunk_id })?;
// copy the RecordBatches into dst // copy the RecordBatches into dst
dst.extend(read_result); dst.extend(read_result);
@ -167,8 +171,21 @@ impl PartitionChunk for DBChunk {
make_scan_plan(batch).context(InternalPlanCreation) make_scan_plan(batch).context(InternalPlanCreation)
} }
Self::ReadBuffer { .. } => { Self::ReadBuffer {
unimplemented!("read buffer file not implemented") db,
partition_key,
chunk_id,
} => {
let chunk_id = *chunk_id;
let rb_predicate =
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
let db = db.read().unwrap();
let batch = db
.table_names(partition_key, &[chunk_id], rb_predicate)
.context(ReadBufferChunk { chunk_id })?;
make_scan_plan(batch).context(InternalPlanCreation)
} }
Self::ParquetFile => { Self::ParquetFile => {
unimplemented!("parquet file not implemented") unimplemented!("parquet file not implemented")