Merge pull request #3495 from influxdata/ntran/persist_merge_snapshots

refactor: add new fields and comments in ingest data buffer
pull/24376/head
kodiakhq[bot] 2022-01-19 23:20:25 +00:00 committed by GitHub
commit 362446b885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 17 deletions

View File

@ -8,7 +8,7 @@ use uuid::Uuid;
use crate::server::IngesterServer;
use iox_catalog::interface::{
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber,
SequencerId, TableId,
SequencerId, TableId, Tombstone,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
@ -45,7 +45,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Ingester Data: a Mapp of Shard ID to its Data
#[derive(Default)]
struct Sequencers {
pub struct Sequencers {
// This map gets set up on initialization of the ingester so it won't ever be modified.
// The content of each SequenceData will get changed when more namespaces and tables
// get ingested.
@ -80,26 +80,26 @@ impl Sequencers {
/// Data of a Shard
#[derive(Default)]
struct SequencerData {
pub struct SequencerData {
// New namespaces can come in at any time so we need to be able to add new ones
namespaces: RwLock<BTreeMap<NamespaceId, Arc<NamespaceData>>>,
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Default)]
struct NamespaceData {
pub struct NamespaceData {
tables: RwLock<BTreeMap<TableId, Arc<TableData>>>,
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Default)]
struct TableData {
pub struct TableData {
// Map pf partition key to its data
partition_data: RwLock<BTreeMap<String, Arc<PartitionData>>>,
}
/// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard
struct PartitionData {
pub struct PartitionData {
id: PartitionId,
inner: RwLock<DataBuffer>,
}
@ -127,9 +127,15 @@ struct PartitionData {
/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘
#[derive(Default)]
struct DataBuffer {
/// Buffer of ingesting data
buffer: Vec<BufferBatch>,
pub struct DataBuffer {
/// Buffer of incoming writes
pub buffer: Vec<BufferBatch>,
/// Buffer of tombstones whose time range may overlap with this partition.
/// These tombstone first will be written into the Catalog and then here.
/// When a persist is called, these tombstones will be moved into the
/// PersistingBatch to get applied in those data.
pub deletes: Vec<Tombstone>,
/// Data in `buffer` will be moved to a `snapshot` when one of these happens:
/// . A background persist is called
@ -155,7 +161,7 @@ struct DataBuffer {
}
/// BufferBatch is a MutauableBatch with its ingesting order, sequencer_number, that
/// helps the ingester keep the batches of data in thier ingesting order
struct BufferBatch {
pub struct BufferBatch {
/// Sequencer number of the ingesting data
pub sequencer_number: SequenceNumber,
/// Ingesting data
@ -163,7 +169,7 @@ struct BufferBatch {
}
/// SnapshotBatch contains data of many contiguous BufferBatches
struct SnapshotBatch {
pub struct SnapshotBatch {
/// Min sequencer number of its comebined BufferBatches
pub min_sequencer_number: SequenceNumber,
/// Max sequencer number of its comebined BufferBatches
@ -174,10 +180,23 @@ struct SnapshotBatch {
/// PersistingBatch contains all needed info and data for creating
/// a parquet file for given set of SnapshotBatches
struct PersistingBatch {
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
object_store_id: Uuid,
data: Vec<SnapshotBatch>,
pub struct PersistingBatch {
/// Sesquencer id of the data
pub sequencer_id: SequencerId,
/// Table id of the data
pub table_id: TableId,
/// Parittion Id of the data
pub partition_id: PartitionId,
/// Id of to-be-created parquet file of this data
pub object_store_id: Uuid,
/// data to be persisted
pub data: Vec<SnapshotBatch>,
/// delete predicates to be appied to the data
/// before perssiting
pub deletes: Vec<Tombstone>,
}