chore: Merge branch 'main' into ntran/ingester

pull/24376/head
NGA-TRAN 2022-01-19 12:37:47 -05:00
commit 9d61580136
4 changed files with 289 additions and 5 deletions

View File

@ -104,7 +104,8 @@ CREATE TABLE IF NOT EXISTS iox_catalog.tombstone
min_time BIGINT NOT NULL,
max_time BIGINT NOT NULL,
serialized_predicate TEXT NOT NULL,
PRIMARY KEY (id)
PRIMARY KEY (id),
CONSTRAINT tombstone_unique UNIQUE (table_id, sequencer_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS iox_catalog.processed_tombstone

View File

@ -161,6 +161,51 @@ impl PartitionId {
}
}
/// Unique ID for a `Tombstone`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct TombstoneId(i64);
#[allow(missing_docs)]
impl TombstoneId {
pub fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {
self.0
}
}
/// A sequence number from a `Sequencer` (kafka partition)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct SequenceNumber(i64);
#[allow(missing_docs)]
impl SequenceNumber {
pub fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {
self.0
}
}
/// A time in nanoseconds from epoch
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct Timestamp(i64);
#[allow(missing_docs)]
impl Timestamp {
pub fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {
self.0
}
}
/// Container that can return repos for each of the catalog data types.
#[async_trait]
pub trait RepoCollection {
@ -178,6 +223,8 @@ pub trait RepoCollection {
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send>;
/// repo for partitions
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send>;
/// repo for tombstones
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send>;
}
/// Functions for working with Kafka topics in the catalog.
@ -271,6 +318,30 @@ pub trait PartitionRepo {
async fn list_by_sequencer(&self, sequencer_id: SequencerId) -> Result<Vec<Partition>>;
}
/// Functions for working with tombstones in the catalog
#[async_trait]
pub trait TombstoneRepo {
/// create or get a tombstone
async fn create_or_get(
&self,
table_id: TableId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
predicate: &str,
) -> Result<Tombstone>;
/// return all tombstones for the sequencer with a sequence number greater than that
/// passed in. This will be used by the ingester on startup to see what tombstones
/// might have to be applied to data that is read from the write buffer.
async fn list_tombstones_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>>;
}
/// Data object for a kafka topic
#[derive(Debug, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct KafkaTopic {
@ -610,6 +681,25 @@ pub struct Partition {
pub partition_key: String,
}
/// Data object for a tombstone.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct Tombstone {
/// the id of the tombstone
pub id: TombstoneId,
/// the table the tombstone is associated with
pub table_id: TableId,
/// the sequencer the tombstone was sent through
pub sequencer_id: SequencerId,
/// the sequence nubmer assigned to the tombstone from the sequencer
pub sequence_number: SequenceNumber,
/// the min time (inclusive) that the delete applies to
pub min_time: Timestamp,
/// the max time (exclusive) that the delete applies to
pub max_time: Timestamp,
/// the full delete predicate
pub serialized_predicate: String,
}
#[cfg(test)]
pub(crate) mod test_helpers {
use super::*;
@ -627,6 +717,7 @@ pub(crate) mod test_helpers {
test_column(&new_repo()).await;
test_sequencer(&new_repo()).await;
test_partition(&new_repo()).await;
test_tombstone(&new_repo()).await;
}
async fn test_kafka_topic<T: RepoCollection + Send + Sync>(repo: &T) {
@ -845,4 +936,78 @@ pub(crate) mod test_helpers {
assert_eq!(created, listed);
}
async fn test_tombstone<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
.create("namespace_tombstone_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repo
.table()
.create_or_get("other", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let tombstone_repo = repo.tombstone();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let t1 = tombstone_repo
.create_or_get(
table.id,
sequencer.id,
SequenceNumber::new(1),
min_time,
max_time,
"whatevs",
)
.await
.unwrap();
assert!(t1.id > TombstoneId::new(0));
assert_eq!(t1.sequencer_id, sequencer.id);
assert_eq!(t1.sequence_number, SequenceNumber::new(1));
assert_eq!(t1.min_time, min_time);
assert_eq!(t1.max_time, max_time);
assert_eq!(t1.serialized_predicate, "whatevs");
let t2 = tombstone_repo
.create_or_get(
other_table.id,
sequencer.id,
SequenceNumber::new(2),
min_time,
max_time,
"bleh",
)
.await
.unwrap();
let t3 = tombstone_repo
.create_or_get(
table.id,
sequencer.id,
SequenceNumber::new(3),
min_time,
max_time,
"sdf",
)
.await
.unwrap();
let listed = tombstone_repo
.list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
.await
.unwrap();
assert_eq!(vec![t2, t3], listed);
}
}

View File

@ -4,8 +4,9 @@
use crate::interface::{
Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, Partition, PartitionId, PartitionRepo,
QueryPool, QueryPoolId, QueryPoolRepo, RepoCollection, Result, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TableRepo,
QueryPool, QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer,
SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId,
TombstoneRepo,
};
use async_trait::async_trait;
use std::convert::TryFrom;
@ -42,6 +43,7 @@ struct MemCollections {
columns: Vec<Column>,
sequencers: Vec<Sequencer>,
partitions: Vec<Partition>,
tombstones: Vec<Tombstone>,
}
impl RepoCollection for Arc<MemCatalog> {
@ -72,6 +74,10 @@ impl RepoCollection for Arc<MemCatalog> {
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
}
}
#[async_trait]
@ -332,6 +338,58 @@ impl PartitionRepo for MemCatalog {
}
}
#[async_trait]
impl TombstoneRepo for MemCatalog {
async fn create_or_get(
&self,
table_id: TableId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
predicate: &str,
) -> Result<Tombstone> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let tombstone = match collections.tombstones.iter().find(|t| {
t.table_id == table_id
&& t.sequencer_id == sequencer_id
&& t.sequence_number == sequence_number
}) {
Some(t) => t,
None => {
let t = Tombstone {
id: TombstoneId::new(collections.tombstones.len() as i64 + 1),
table_id,
sequencer_id,
sequence_number,
min_time,
max_time,
serialized_predicate: predicate.to_string(),
};
collections.tombstones.push(t);
collections.tombstones.last().unwrap()
}
};
Ok(tombstone.clone())
}
async fn list_tombstones_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>> {
let collections = self.collections.lock().expect("mutex poisoned");
let tombstones: Vec<_> = collections
.tombstones
.iter()
.filter(|t| t.sequencer_id == sequencer_id && t.sequence_number > sequence_number)
.cloned()
.collect();
Ok(tombstones)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -3,8 +3,8 @@
use crate::interface::{
Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, Partition, PartitionRepo, QueryPool,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, Sequencer, SequencerId, SequencerRepo,
Table, TableId, TableRepo,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneRepo,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
@ -88,6 +88,10 @@ impl RepoCollection for Arc<PostgresCatalog> {
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
}
}
#[async_trait]
@ -376,6 +380,58 @@ impl PartitionRepo for PostgresCatalog {
}
}
#[async_trait]
impl TombstoneRepo for PostgresCatalog {
async fn create_or_get(
&self,
table_id: TableId,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
min_time: Timestamp,
max_time: Timestamp,
predicate: &str,
) -> Result<Tombstone> {
sqlx::query_as::<_, Tombstone>(
r#"
INSERT INTO tombstone
( table_id, sequencer_id, sequence_number, min_time, max_time, serialized_predicate )
VALUES
( $1, $2, $3, $4, $5, $6 )
ON CONFLICT ON CONSTRAINT tombstone_unique
DO UPDATE SET table_id = tombstone.table_id RETURNING *;
"#,
)
.bind(&table_id) // $1
.bind(&sequencer_id) // $2
.bind(&sequence_number) // $3
.bind(&min_time) // $4
.bind(&max_time) // $5
.bind(predicate) // $6
.fetch_one(&self.pool)
.await
.map_err(|e| {
if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e }
} else {
Error::SqlxError { source: e }
}
})
}
async fn list_tombstones_by_sequencer_greater_than(
&self,
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<Tombstone>> {
sqlx::query_as::<_, Tombstone>(r#"SELECT * FROM tombstone WHERE sequencer_id = $1 AND sequence_number > $2 ORDER BY id;"#)
.bind(&sequencer_id) // $1
.bind(&sequence_number) // $2
.fetch_all(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
/// The error code returned by Postgres for a unique constraint violation.
///
/// See <https://www.postgresql.org/docs/9.2/errcodes-appendix.html>
@ -476,6 +532,10 @@ mod tests {
}
async fn clear_schema(pool: &Pool<Postgres>) {
sqlx::query("delete from tombstone;")
.execute(pool)
.await
.unwrap();
sqlx::query("delete from column_name;")
.execute(pool)
.await