feat: handling empty compacted stream

pull/24376/head
Nga Tran 2021-11-30 18:13:36 -05:00
parent 6bb7a796e7
commit f53cdca010
2 changed files with 142 additions and 28 deletions

View File

@ -160,6 +160,35 @@ impl Storage {
Ok((path, file_size_bytes, md))
}
/// Write the given stream of data of a specified table of
/// a specified partitioned chunk to a parquet file of this storage
///
/// returns the path to which the chunk was written, the size of
/// the bytes, and the parquet metadata
pub async fn write_to_object_store_if_having_data(
&self,
chunk_addr: ChunkAddr,
stream: SendableRecordBatchStream,
metadata: IoxMetadata,
) -> Result<Option<(ParquetFilePath, usize, IoxParquetMetaData)>> {
// Create full path location of this file in object store
let path = ParquetFilePath::new(&chunk_addr);
let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
if data.is_empty() {
return Ok(None);
}
let file_size_bytes = data.len();
let md =
IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?;
self.to_object_store(data, &path).await?;
Ok(Some((path, file_size_bytes, md)))
}
fn writer_props(metadata_bytes: &[u8]) -> WriterProperties {
WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {

View File

@ -90,7 +90,7 @@ pub(crate) fn compact_object_store_chunks(
// Step 3: Compact the os chunks into a stream
// No locks are hold during compaction
let compacted_stream = compact_chunks(&db, &compacting_os_chunks.os_chunks).await?;
let compacted_rows;
let mut compacted_rows = 0;
let _schema = compacted_stream.schema;
let sort_key = compacted_stream.sort_key;
@ -120,7 +120,6 @@ pub(crate) fn compact_object_store_chunks(
iox_metadata.clone(),
)
.await?;
compacted_rows = compacted_and_persisted_chunk.rows();
// Step 4.2: Update the preserved catalogs to use the newly created os_chunk
update_preserved_catalog(
@ -140,11 +139,15 @@ pub(crate) fn compact_object_store_chunks(
&db,
&chunk_ids,
iox_metadata,
Arc::clone(&compacted_and_persisted_chunk),
compacted_and_persisted_chunk.clone(),
compacting_os_chunks.partition,
)
.await?;
if let Some(compacted_and_persisted_chunk) = compacted_and_persisted_chunk {
compacted_rows = compacted_and_persisted_chunk.rows();
}
// Log the summary
let elapsed = now.elapsed();
// input rows per second
@ -158,7 +161,7 @@ pub(crate) fn compact_object_store_chunks(
rows_per_sec=?throughput,
"object store chunk(s) compacted");
Ok(Some(dbchunk)) // todo: consider to return Ok(None) when appropriate
Ok(dbchunk) // todo: consider to return Ok(None) when appropriate
};
Ok((tracker, fut.track(registration)))
@ -360,18 +363,24 @@ async fn persist_stream_to_chunk<'a>(
partition_addr: &'a PartitionAddr,
stream: SendableRecordBatchStream,
iox_metadata: IoxMetadata,
) -> Result<Arc<ParquetChunk>> {
) -> Result<Option<Arc<ParquetChunk>>> {
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.iox_object_store));
// Write the chunk stream data into a parquet file in the storage
let chunk_addr = ChunkAddr::new(partition_addr, iox_metadata.chunk_id);
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(chunk_addr, stream, iox_metadata)
let written_result = storage
.write_to_object_store_if_having_data(chunk_addr, stream, iox_metadata)
.await
.context(WritingToObjectStore)?;
// the stream was empty
if written_result.is_none() {
return Ok(None);
}
// Create parquet chunk for the parquet file
let (path, file_size_bytes, parquet_metadata) = written_result.unwrap();
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = ParquetChunkMetrics::new(db.metric_registry.as_ref());
let parquet_chunk = Arc::new(
@ -387,14 +396,14 @@ async fn persist_stream_to_chunk<'a>(
.context(ParquetChunkError)?,
);
Ok(parquet_chunk)
Ok(Some(parquet_chunk))
}
/// Update the preserved catalog : replace compacted chunks with a newly persisted chunk
async fn update_preserved_catalog(
db: &Db,
commpated_parquet_file_paths: &[ParquetFilePath],
parquet_chunk: &Arc<ParquetChunk>,
parquet_chunk: &Option<Arc<ParquetChunk>>,
) -> Result<()> {
// Open transaction
let mut transaction = db.preserved_catalog.open_transaction().await;
@ -404,13 +413,15 @@ async fn update_preserved_catalog(
transaction.remove_parquet(parquet_file_path);
}
// Add the new chunk
let catalog_parquet_info = CatalogParquetInfo {
path: parquet_chunk.path().clone(),
file_size_bytes: parquet_chunk.file_size_bytes(),
metadata: parquet_chunk.parquet_metadata(),
};
transaction.add_parquet(&catalog_parquet_info);
// Add new chunk if compaction returns some data
if let Some(parquet_chunk) = parquet_chunk {
let catalog_parquet_info = CatalogParquetInfo {
path: parquet_chunk.path().clone(),
file_size_bytes: parquet_chunk.file_size_bytes(),
metadata: parquet_chunk.parquet_metadata(),
};
transaction.add_parquet(&catalog_parquet_info);
}
// Close/commit the transaction
transaction.commit().await.context(CommitError)?;
@ -424,9 +435,9 @@ async fn update_in_memory_catalog(
db: &Arc<Db>,
chunk_ids: &[ChunkId],
iox_metadata: IoxMetadata,
parquet_chunk: Arc<ParquetChunk>,
parquet_chunk: Option<Arc<ParquetChunk>>,
partition: Arc<RwLock<Partition>>,
) -> Result<Arc<DbChunk>> {
) -> Result<Option<Arc<DbChunk>>> {
// Collect delete predicates added during compaction
let delete_handle = db.delete_predicates_mailbox.consume().await;
let mut delete_predicates = vec![];
@ -448,16 +459,22 @@ async fn update_in_memory_catalog(
.expect("There was a lifecycle action attached to this chunk, who deleted it?!");
}
// Insert new compacted and persisted chunk
let chunk = partition.insert_object_store_only_chunk(
iox_metadata.chunk_id,
parquet_chunk,
iox_metadata.time_of_first_write,
iox_metadata.time_of_last_write,
delete_predicates,
iox_metadata.chunk_order,
);
let dbchunk = DbChunk::parquet_file_snapshot(&*chunk.read());
// Only create a new chunk if compaction returns rows
let dbchunk = match parquet_chunk {
Some(parquet_chunk) => {
let chunk = partition.insert_object_store_only_chunk(
iox_metadata.chunk_id,
parquet_chunk,
iox_metadata.time_of_first_write,
iox_metadata.time_of_last_write,
delete_predicates,
iox_metadata.chunk_order,
);
let dbchunk = DbChunk::parquet_file_snapshot(&*chunk.read());
Some(dbchunk)
}
None => None,
};
// drop partition lock
std::mem::drop(partition);
@ -796,6 +813,74 @@ mod tests {
assert_eq!(summary_chunks[1].row_count, 2);
}
#[ignore]
#[tokio::test]
async fn test_compact_os_on_chunk_delete_all() {
test_helpers::maybe_start_logging();
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete
// persist chunk 1
let _chunk_id_1 = db
.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap()
.id();
// Delete all
let predicate = Arc::new(DeletePredicate {
range: TimestampRange { start: 0, end: 30 },
exprs: vec![],
});
db.delete("cpu", predicate).unwrap();
//
// Add a MUB
write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40");
// Verify results before OS compacting
let partition = db.lockable_partition("cpu", partition_key).unwrap();
let partition = partition.read();
let chunks = LockablePartition::chunks(&partition);
assert_eq!(chunks.len(), 2);
// chunk summary
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
assert_eq!(summary_chunks.len(), 2);
summary_chunks.sort_by_key(|c| c.storage);
assert_eq!(summary_chunks[0].storage, ChunkStorage::OpenMutableBuffer);
assert_eq!(summary_chunks[0].row_count, 1);
assert_eq!(
summary_chunks[1].storage,
ChunkStorage::ReadBufferAndObjectStore
);
assert_eq!(summary_chunks[1].row_count, 2);
// compact the only OS chunk
let partition = partition.upgrade();
let chunk1 = chunks[0].write();
let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1])
.unwrap()
.1
.await
.unwrap();
//.unwrap();
let err = compacted_chunk.unwrap_err();
println!("{}", err.to_string());
// verify results
let partition = db.partition("cpu", partition_key).unwrap();
let mut summary_chunks: Vec<_> = partition.read().chunk_summaries().collect();
summary_chunks.sort_by_key(|c| c.storage);
//Should only have MUB chunk
assert_eq!(summary_chunks.len(), 1);
assert_eq!(summary_chunks[0].storage, ChunkStorage::OpenMutableBuffer);
assert_eq!(summary_chunks[0].row_count, 1);
}
// todo: add tests
// . compact with deletes happening during compaction
// . compact with deletes/duplicates that lead to empty result