feat: Initial SQLite catalog schema (#6851)

* feat: Initial SQLite catalog schema

* chore: Run cargo hakari tasks

* feat: impls, many TODOs

* feat: completed `todo!()`'s

* chore: add remaining tests from postgres module

* feat: add SQLite to get_catalog API

* chore: Add docs

* chore: Placate clippy

* chore: Placate clippy

* chore: PR feedback from @domodwyer

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Stuart Carnie 2023-02-07 08:55:14 +10:00 committed by GitHub
parent 452b69cd2c
commit eb245d6774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 3293 additions and 18 deletions

35
Cargo.lock generated
View File

@ -1822,6 +1822,18 @@ dependencies = [
"num-traits",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.4",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -2820,9 +2832,11 @@ dependencies = [
"mutable_batch",
"mutable_batch_lp",
"observability_deps",
"parking_lot 0.12.1",
"paste",
"pretty_assertions",
"rand",
"serde",
"snafu",
"sqlx",
"sqlx-hotswap-pool",
@ -3316,6 +3330,17 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "libsqlite3-sys"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "898745e570c7d0453cc1fbc4a701eb6c662ed54e8fec8b7d14be137ebeeb9d14"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.8"
@ -5442,8 +5467,10 @@ dependencies = [
"dotenvy",
"either",
"event-listener",
"flume",
"futures-channel",
"futures-core",
"futures-executor",
"futures-intrusive",
"futures-util",
"hashlink",
@ -5453,6 +5480,7 @@ dependencies = [
"indexmap",
"itoa 1.0.5",
"libc",
"libsqlite3-sys",
"log",
"md-5",
"memchr",
@ -6372,6 +6400,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@ -6724,6 +6758,7 @@ dependencies = [
"flate2",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",

View File

@ -1,4 +1,5 @@
//! Catalog-DSN-related configs.
use iox_catalog::sqlite::{SqliteCatalog, SqliteConnectionOptions};
use iox_catalog::{
create_or_get_default_records,
interface::Catalog,
@ -15,6 +16,9 @@ pub enum Error {
#[snafu(display("A Postgres connection string in --catalog-dsn is required."))]
ConnectionStringRequired,
#[snafu(display("A SQLite connection string in --catalog-dsn is required."))]
ConnectionStringSqliteRequired,
#[snafu(display("A catalog error occurred: {}", source))]
Catalog {
source: iox_catalog::interface::Error,
@ -44,7 +48,7 @@ fn default_hotswap_poll_interval_timeout() -> &'static str {
}
/// CLI config for catalog DSN.
#[derive(Debug, Clone, clap::Parser)]
#[derive(Debug, Clone, Default, clap::Parser)]
pub struct CatalogDsnConfig {
/// The type of catalog to use. "memory" is only useful for testing purposes.
#[clap(
@ -110,13 +114,17 @@ pub struct CatalogDsnConfig {
}
/// Catalog type.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum CatalogType {
/// PostgreSQL.
#[default]
Postgres,
/// In-memory.
Memory,
/// SQLite.
Sqlite,
}
impl CatalogDsnConfig {
@ -127,12 +135,7 @@ impl CatalogDsnConfig {
Self {
catalog_type_: CatalogType::Memory,
dsn: None,
max_catalog_connections: PostgresConnectionOptions::DEFAULT_MAX_CONNS,
postgres_schema_name: PostgresConnectionOptions::DEFAULT_SCHEMA_NAME.to_string(),
connect_timeout: PostgresConnectionOptions::DEFAULT_CONNECT_TIMEOUT,
idle_timeout: PostgresConnectionOptions::DEFAULT_IDLE_TIMEOUT,
hotswap_poll_interval: PostgresConnectionOptions::DEFAULT_HOTSWAP_POLL_INTERVAL,
..Self::default()
}
}
@ -151,6 +154,17 @@ impl CatalogDsnConfig {
}
}
/// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified
pub fn new_sqlite(dsn: String) -> Self {
info!("Catalog: SQLite at `{}`", dsn);
Self {
catalog_type_: CatalogType::Sqlite,
dsn: Some(dsn),
..Self::default()
}
}
/// Get config-dependent catalog.
pub async fn get_catalog(
&self,
@ -189,6 +203,20 @@ impl CatalogDsnConfig {
Arc::new(mem) as Arc<dyn Catalog>
}
CatalogType::Sqlite => {
let options = SqliteConnectionOptions {
dsn: self
.dsn
.as_ref()
.context(ConnectionStringSqliteRequiredSnafu)?
.clone(),
};
Arc::new(
SqliteCatalog::connect(options, metrics)
.await
.context(CatalogSnafu)?,
) as Arc<dyn Catalog>
}
};
Ok(catalog)

View File

@ -854,11 +854,8 @@ impl From<&str> for PartitionKey {
}
}
impl<DB> sqlx::Type<DB> for PartitionKey
where
DB: sqlx::Database<TypeInfo = sqlx::postgres::PgTypeInfo>,
{
fn type_info() -> DB::TypeInfo {
impl sqlx::Type<sqlx::Postgres> for PartitionKey {
fn type_info() -> sqlx::postgres::PgTypeInfo {
// Store this type as VARCHAR
sqlx::postgres::PgTypeInfo::with_name("VARCHAR")
}
@ -883,6 +880,31 @@ impl sqlx::Decode<'_, sqlx::Postgres> for PartitionKey {
}
}
impl sqlx::Type<sqlx::Sqlite> for PartitionKey {
fn type_info() -> sqlx::sqlite::SqliteTypeInfo {
<String as sqlx::Type<sqlx::Sqlite>>::type_info()
}
}
impl sqlx::Encode<'_, sqlx::Sqlite> for PartitionKey {
fn encode_by_ref(
&self,
buf: &mut <sqlx::Sqlite as sqlx::database::HasArguments<'_>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<String as sqlx::Encode<sqlx::Sqlite>>::encode(self.0.to_string(), buf)
}
}
impl sqlx::Decode<'_, sqlx::Sqlite> for PartitionKey {
fn decode(
value: <sqlx::Sqlite as sqlx::database::HasValueRef<'_>>::ValueRef,
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
Ok(Self(
<String as sqlx::Decode<sqlx::Sqlite>>::decode(value)?.into(),
))
}
}
/// Data object for a partition. The combination of shard, table and key are unique (i.e. only
/// one record can exist for each combo)
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]

1
iox_catalog/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
iox_catalog.sqlite3

View File

@ -14,8 +14,10 @@ log = "0.4"
metric = { version = "0.1.0", path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
observability_deps = { path = "../observability_deps" }
parking_lot = { version = "0.12" }
serde = { version = "1.0", features = ["derive"] }
snafu = "0.7"
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid", "sqlite" ] }
sqlx-hotswap-pool = { path = "../sqlx-hotswap-pool" }
thiserror = "1.0.38"
tokio = { version = "1.25", features = ["io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }

View File

@ -0,0 +1,233 @@
create table if not exists topic
(
id INTEGER not null
constraint kafka_topic_pkey
primary key autoincrement,
name VARCHAR not null
constraint topic_name_unique unique
);
create table if not exists query_pool
(
id INTEGER NOT NULL
constraint query_pool_pkey
primary key autoincrement,
name varchar not null
constraint query_pool_name_unique
unique
);
create table if not exists namespace
(
id INTEGER
constraint namespace_pkey
primary key autoincrement,
name varchar not null
constraint namespace_name_unique
unique,
topic_id numeric not null
constraint namespace_kafka_topic_id_fkey
references topic,
query_pool_id numeric not null
references query_pool,
max_tables integer default 10000 not null,
max_columns_per_table integer default 200 not null,
retention_period_ns numeric
);
create table if not exists table_name
(
id INTEGER
constraint table_name_pkey
primary key autoincrement,
namespace_id numeric not null
references namespace
on delete cascade,
name varchar not null,
constraint table_name_unique
unique (namespace_id, name)
);
create index if not exists table_name_namespace_idx
on table_name (namespace_id);
create table if not exists column_name
(
id INTEGER
constraint column_name_pkey
primary key autoincrement,
table_id numeric not null
references table_name
on delete cascade,
name varchar not null,
column_type smallint not null,
constraint column_name_unique
unique (table_id, name)
);
create index if not exists column_name_table_idx
on column_name (table_id);
create table if not exists shard
(
id INTEGER
constraint sequencer_pkey
primary key autoincrement,
topic_id numeric not null
constraint sequencer_kafka_topic_id_fkey
references topic,
shard_index integer not null,
min_unpersisted_sequence_number numeric,
constraint shard_unique
unique (topic_id, shard_index)
);
create table if not exists sharding_rule_override
(
id INTEGER
constraint sharding_rule_override_pkey
primary key autoincrement,
namespace_id numeric not null
references namespace,
table_id numeric not null
references table_name,
column_id numeric not null
references column_name
);
create table if not exists partition
(
id INTEGER
constraint partition_pkey
primary key autoincrement,
shard_id numeric not null
constraint partition_sequencer_id_fkey
references shard,
table_id numeric not null
references table_name
on delete cascade,
partition_key varchar not null,
sort_key text [] not null,
persisted_sequence_number numeric,
to_delete numeric,
new_file_at numeric,
constraint partition_key_unique
unique (table_id, partition_key)
);
create table if not exists parquet_file
(
id INTEGER
constraint parquet_file_pkey
primary key autoincrement,
shard_id numeric not null
constraint parquet_file_sequencer_id_fkey
references shard,
table_id numeric not null
references table_name,
partition_id numeric not null
references partition,
object_store_id uuid not null
constraint parquet_location_unique
unique,
max_sequence_number numeric,
min_time numeric,
max_time numeric,
to_delete numeric,
row_count numeric default 0 not null,
file_size_bytes numeric default 0 not null,
compaction_level smallint default 0 not null,
created_at numeric,
namespace_id numeric not null
references namespace
on delete cascade,
column_set numeric[] not null,
max_l0_created_at numeric default 0 not null
);
create index if not exists parquet_file_deleted_at_idx
on parquet_file (to_delete);
create index if not exists parquet_file_partition_idx
on parquet_file (partition_id);
create index if not exists parquet_file_table_idx
on parquet_file (table_id);
create index if not exists parquet_file_shard_compaction_delete_idx
on parquet_file (shard_id, compaction_level, to_delete);
create index if not exists parquet_file_shard_compaction_delete_created_idx
on parquet_file (shard_id, compaction_level, to_delete, created_at);
create index if not exists parquet_file_partition_created_idx
on parquet_file (partition_id, created_at);
create table if not exists tombstone
(
id INTEGER
constraint tombstone_pkey
primary key autoincrement,
table_id numeric not null
references table_name
on delete cascade,
shard_id numeric not null
constraint tombstone_sequencer_id_fkey
references shard,
sequence_number numeric not null,
min_time numeric not null,
max_time numeric not null,
serialized_predicate text not null,
constraint tombstone_unique
unique (table_id, shard_id, sequence_number)
);
create table if not exists processed_tombstone
(
tombstone_id INTEGER not null
references tombstone,
parquet_file_id numeric not null
references parquet_file
on delete cascade,
primary key (tombstone_id, parquet_file_id)
);
create table if not exists skipped_compactions
(
partition_id INTEGER not null
constraint skipped_compactions_pkey
primary key
references partition
on delete cascade,
reason text not null,
skipped_at numeric not null,
num_files numeric,
limit_num_files numeric,
estimated_bytes numeric,
limit_bytes numeric,
limit_num_files_first_in_partition numeric
);
create table if not exists billing_summary
(
namespace_id integer not null
constraint billing_summary_pkey
primary key
references namespace
on delete cascade,
total_file_size_bytes numeric not null
);
create index if not exists billing_summary_namespace_idx
on billing_summary (namespace_id);

View File

@ -0,0 +1,31 @@
create trigger if not exists update_partition
after insert
on parquet_file
for each row
when NEW.compaction_level < 2
begin
UPDATE partition set new_file_at = NEW.created_at WHERE id = NEW.partition_id;
end;
create trigger if not exists update_billing
after insert
on parquet_file
for each row
begin
INSERT INTO billing_summary (namespace_id, total_file_size_bytes)
VALUES (NEW.namespace_id, NEW.file_size_bytes)
ON CONFLICT (namespace_id) DO UPDATE
SET total_file_size_bytes = billing_summary.total_file_size_bytes + NEW.file_size_bytes
WHERE billing_summary.namespace_id = NEW.namespace_id;
end;
create trigger if not exists decrement_summary
after update
on parquet_file
for each row
when OLD.to_delete IS NULL AND NEW.to_delete IS NOT NULL
begin
UPDATE billing_summary
SET total_file_size_bytes = billing_summary.total_file_size_bytes - OLD.file_size_bytes
WHERE billing_summary.namespace_id = OLD.namespace_id;
end;

View File

@ -42,6 +42,7 @@ pub mod interface;
pub mod mem;
pub mod metrics;
pub mod postgres;
pub mod sqlite;
/// An [`crate::interface::Error`] scoped to a single table for schema validation errors.
#[derive(Debug, Error)]

2920
iox_catalog/src/sqlite.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,7 @@ flatbuffers = { version = "23", features = ["std"] }
flate2 = { version = "1", features = ["miniz_oxide", "rust_backend"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-executor = { version = "0.3", features = ["std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
@ -74,8 +75,8 @@ serde_json = { version = "1", features = ["raw_value", "std"] }
sha2 = { version = "0.10", features = ["std"] }
similar = { version = "2", features = ["inline", "text"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate", "postgres", "runtime-tokio-rustls", "sqlx-macros", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "sqlx-macros", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "flume", "futures-executor", "hkdf", "hmac", "json", "libsqlite3-sys", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "sqlite", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
thrift = { version = "0.17", features = ["log", "server", "threadpool"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
@ -107,6 +108,7 @@ either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-executor = { version = "0.3", features = ["std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
@ -137,8 +139,8 @@ serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["raw_value", "std"] }
sha2 = { version = "0.10", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx-macros = { version = "0.6", default-features = false, features = ["_rt-tokio", "json", "migrate", "postgres", "runtime-tokio-rustls", "serde_json", "sha2", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "flume", "futures-executor", "hkdf", "hmac", "json", "libsqlite3-sys", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha1", "sha2", "sqlite", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
sqlx-macros = { version = "0.6", default-features = false, features = ["_rt-tokio", "json", "migrate", "postgres", "runtime-tokio-rustls", "serde_json", "sha2", "sqlite", "uuid"] }
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }