diff --git a/.circleci/config.yml b/.circleci/config.yml index 438a3b7171..f0ee2e8066 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -225,14 +225,6 @@ jobs: # When removing this, also remove the ignore on the test in trogging/src/cli.rs RUST_LOG: debug,,hyper::proto::h1=info,h2=info LOG_FILTER: debug,,hyper::proto::h1=info,h2=info - # TEMPORARY: Can be removed when the ingester that uses the write buffer is removed. Tests - # need to spin up separate servers because I've only been able to implement a "persist - # everything" API, and if tests run in parallel using a shared server, they interfere with - # each other. Starting separate servers with the maximum number of Rust test threads uses up - # all the Postgres connections in CI, so limit the parallelization until we switch completely - # to ingester2, which does have a "persist-per-namespace" API that means tests can run on - # shared MiniClusters. - RUST_TEST_THREADS: 8 # Run the JDBC tests TEST_INFLUXDB_JDBC: "true" steps: diff --git a/Cargo.lock b/Cargo.lock index a6d9a3cbb9..0f21c689c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2443,8 +2443,6 @@ dependencies = [ "client_util", "data_types", "futures", - "generated_types", - "influxdb_iox_client", "iox_catalog", "metric", "object_store", @@ -2455,7 +2453,6 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tonic 0.9.2", "workspace-hack", ] @@ -2597,7 +2594,6 @@ dependencies = [ "schema", "serde", "serde_json", - "sharder", "snafu", "tempfile", "test_helpers", @@ -2975,7 +2971,6 @@ dependencies = [ "parquet_file", "predicate", "schema", - "sharder", "uuid", "workspace-hack", ] diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index fce39986bb..6bc4c970af 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -19,7 +19,6 @@ observability_deps = { path = "../observability_deps" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.96" snafu = "0.7" -tempfile = "3.5.0" trace = { path = "../trace" } trace_exporters = { path = "../trace_exporters" } trogging = { path = "../trogging", default-features = false, features = ["clap"] } @@ -27,6 +26,7 @@ uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] +tempfile = "3.5.0" test_helpers = { path = "../test_helpers" } [features] diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 9adc3e8442..b21164e324 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -726,8 +726,8 @@ impl sqlx::Decode<'_, sqlx::Sqlite> for PartitionKey { } } -/// Data object for a partition. The combination of shard, table and key are unique (i.e. only -/// one record can exist for each combo) +/// Data object for a partition. The combination of table and key are unique (i.e. only one record +/// can exist for each combo) #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct Partition { /// the id of the partition @@ -1849,10 +1849,9 @@ pub const MIN_NANO_TIME: i64 = i64::MIN + 2; /// /// 2262-04-11 23:47:16.854775806 +0000 UTC /// -/// The highest time represented by a nanosecond needs to be used for an -/// exclusive range in the shard group, so the maximum time needs to be one -/// less than the possible maximum number of nanoseconds representable by an -/// int64 so that we don't lose a point at that one time. +/// The highest time represented by a nanosecond needs to be used for an exclusive range, so the +/// maximum time needs to be one less than the possible maximum number of nanoseconds representable +/// by an int64 so that we don't lose a point at that one time. /// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34) pub const MAX_NANO_TIME: i64 = i64::MAX - 1; diff --git a/docs/compactor.md b/docs/compactor.md index 253a8cd201..b747752efe 100644 --- a/docs/compactor.md +++ b/docs/compactor.md @@ -1,6 +1,6 @@ # Job of a Compactor -Compactor is one of the servers in an IOx cluster and its main job is to compact many small and time-overlapped files into larger and non-time-overlapped files. Duplicated and soft deleted data will also be removed during compaction. There may be one or many Compactors in a cluster, each will be responsible for compacting files of a set of specified shards. +Compactor is one of the servers in an IOx cluster and its main job is to compact many small and time-overlapped files into larger and non-time-overlapped files. Duplicated and soft deleted data will also be removed during compaction. There may be one or many Compactors in a cluster. - The purpose of compaction to increase query performance by 1. Avoiding reading too many small files @@ -46,8 +46,7 @@ If increasing memory a lot does not help, consider changing one or a combination These are [up-to-date configurable parameters](https://github.com/influxdata/influxdb_iox/blob/main/clap_blocks/src/compactor.rs). Here are a few key parameters you may want to tune for your needs: - **Size of the files:** The compactor cannot control the sizes of level-0 files but they are usually small and can be adjusted by config params of the Ingesters. The compactor decides the max desired size of level-1 and level-2 files which is around `INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES * (100 + INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE) / 100`. - - **Map a compactor to several shards:** Depending on your Ingester setup, there may be several shards. A compactor can be set up to compact all or a fraction of the shards. Use range `[INFLUXDB_IOX_SHARD_INDEX_RANGE_START, INFLUXDB_IOX_SHARD_INDEX_RANGE_END]` to map them. -- **Number of partitions considered to compact per shard:** If there is enough memory, which is usually the case, the compactor will compact many partitions of the same or different shards concurrently. Depending on how many shards a compactor handles and how much memory that compactor is configured to use, you can increase/reduce the concurrent compaction level by increasing/reducing the number of partitions per shard by adjusting `INFLUXDB_IOX_COMPACTION_MAX_NUMBER_PARTITIONS_PER_SHARD`. +- **Number of partitions considered to compact:** If there is enough memory, which is usually the case, the compactor will compact many partitions concurrently. Depending on how much memory that compactor is configured to use, you can increase/reduce the concurrent compaction level by increasing/reducing the number of partitions. - **Concurrency capacity:** to configure this based on your available memory, you need to understand how IOx estimates memory to compact files in the next section. # Memory Estimation @@ -196,15 +195,15 @@ SELECT * FROM skipped_compactions; -- remove partitions from the skipped_compactions DELETE FROM skipped_compactions WHERE partition_id in ([your_ids]); --- Content of skipped_compactions with their shard index, partition key and table id -SELECT shard_index, table_id, partition_id, partition_key, left(reason, 25), +-- Content of skipped_compactions with their partition key and table id +SELECT table_id, partition_id, partition_key, left(reason, 25), num_files, limit_num_files, estimated_bytes, limit_bytes, to_timestamp(skipped_at) skipped_at -FROM skipped_compactions, partition, shard -WHERE partition.id = skipped_compactions.partition_id and partition.shard_id = shard.id -ORDER BY shard_index, table_id, partition_key, skipped_at; +FROM skipped_compactions, partition +WHERE partition.id = skipped_compactions.partition_id +ORDER BY table_id, partition_key, skipped_at; -- Number of files per level for top 50 partitions with most files of a specified day -SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key, +SELECT pf.table_id, pf.partition_id, p.partition_key, count(case when pf.to_delete is null then 1 end) total_not_deleted, count(case when pf.compaction_level=0 and pf.to_delete is null then 1 end) num_l0, count(case when pf.compaction_level=1 and pf.to_delete is null then 1 end) num_l1, @@ -212,10 +211,10 @@ SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key, count(case when pf.compaction_level=0 and pf.to_delete is not null then 1 end) deleted_num_l0, count(case when pf.compaction_level=1 and pf.to_delete is not null then 1 end) deleted_num_l1, count(case when pf.compaction_level=2 and pf.to_delete is not null then 1 end) deleted_num_l2 -FROM parquet_file pf, partition p, shard s -WHERE pf.partition_id = p.id AND pf.shard_id = s.id +FROM parquet_file pf, partition p +WHERE pf.partition_id = p.id AND p.partition_key = '2022-10-11' -GROUP BY s.shard_index, pf.table_id, pf.partition_id, p.partition_key +GROUP BY pf.table_id, pf.partition_id, p.partition_key ORDER BY count(case when pf.to_delete is null then 1 end) DESC LIMIT 50; diff --git a/generated_types/build.rs b/generated_types/build.rs index 585eef0a62..de783dd931 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -30,7 +30,6 @@ fn main() -> Result<()> { /// - `influxdata.iox.schema.v1.rs` /// - `influxdata.iox.wal.v1.rs` /// - `influxdata.iox.write.v1.rs` -/// - `influxdata.iox.write_buffer.v1.rs` /// - `influxdata.platform.storage.rs` fn generate_grpc_types(root: &Path) -> Result<()> { let authz_path = root.join("influxdata/iox/authz/v1"); @@ -44,7 +43,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let querier_path = root.join("influxdata/iox/querier/v1"); let schema_path = root.join("influxdata/iox/schema/v1"); let wal_path = root.join("influxdata/iox/wal/v1"); - let write_buffer_path = root.join("influxdata/iox/write_buffer/v1"); let storage_path = root.join("influxdata/platform/storage"); let storage_errors_path = root.join("influxdata/platform/errors"); @@ -70,7 +68,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"), schema_path.join("service.proto"), wal_path.join("wal.proto"), - write_buffer_path.join("write_buffer.proto"), storage_path.join("predicate.proto"), storage_path.join("service.proto"), storage_path.join("source.proto"), diff --git a/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto b/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto deleted file mode 100644 index 4d61e6b651..0000000000 --- a/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto +++ /dev/null @@ -1,63 +0,0 @@ -syntax = "proto3"; -package influxdata.iox.write_buffer.v1; -option go_package = "github.com/influxdata/iox/write_buffer/v1"; - -import "influxdata/iox/delete/v1/service.proto"; -import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"; - -// Configures the use of a write buffer. -message WriteBufferConnection { - reserved 1; - reserved "direction"; - - // Which type should be used (e.g. "kafka", "mock") - string type = 2; - - // Connection string, depends on `type`. - string connection = 3; - - // Old non-nested auto-creation config. - reserved 4, 5, 7; - - // Special configs to be applied when establishing the connection. - // - // This depends on `type` and can configure aspects like timeouts. - map connection_config = 6; - - // Specifies if the shards (e.g. for Kafka in form of a topic w/ `n_shards` partitions) should be - // automatically created if they do not existing prior to reading or writing. - WriteBufferCreationConfig creation_config = 8; -} - -// Configs shard auto-creation for write buffers. -// -// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ -// `n_shards` partitions. -message WriteBufferCreationConfig { - // Renamed from n_sequencers to n_shards. - reserved 1; - reserved "n_sequencers"; - - // Number of shards. - // - // How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of - // partitions. - // - // If 0, a server-side default is used - uint32 n_shards = 3; - - // Special configs to by applied when shards are created. - // - // This depends on `type` and can setup parameters like retention policy. - // - // Contains 0 or more key value pairs - map options = 2; -} - -// A write payload for the write buffer -message WriteBufferPayload { - oneof payload { - influxdata.pbdata.v1.DatabaseBatch write = 1; - influxdata.iox.delete.v1.DeletePayload delete = 2; - } -} diff --git a/generated_types/src/ingester.rs b/generated_types/src/ingester.rs index 0b1c9c63d5..e38b09a6e3 100644 --- a/generated_types/src/ingester.rs +++ b/generated_types/src/ingester.rs @@ -38,8 +38,7 @@ pub struct IngesterQueryRequest { } impl IngesterQueryRequest { - /// Make a request to return data for a specified table for - /// all shards an ingester is responsible for + /// Make a request to return data for a specified table pub fn new( namespace_id: NamespaceId, table_id: TableId, diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 5d274b578d..b718f51a44 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -173,19 +173,6 @@ pub mod influxdata { include!(concat!(env!("OUT_DIR"), "/influxdata.iox.wal.v1.serde.rs")); } } - - pub mod write_buffer { - pub mod v1 { - include!(concat!( - env!("OUT_DIR"), - "/influxdata.iox.write_buffer.v1.rs" - )); - include!(concat!( - env!("OUT_DIR"), - "/influxdata.iox.write_buffer.v1.serde.rs" - )); - } - } } pub mod pbdata { diff --git a/import/Cargo.toml b/import/Cargo.toml index b3a83baf4f..a46275205e 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -9,8 +9,6 @@ license.workspace = true chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } futures = "0.3" -generated_types = { path = "../generated_types" } -influxdb_iox_client = { path = "../influxdb_iox_client" } iox_catalog = { path = "../iox_catalog" } object_store = { version = "0.5.6", features = ["aws"] } schema = { path = "../schema" } @@ -18,7 +16,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.96" thiserror = "1.0.40" tokio = { version = "1.28" } -tonic = { workspace = true } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 8d4e158c7f..3b6bf304b5 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -37,7 +37,6 @@ parquet_to_line_protocol = { path = "../parquet_to_line_protocol" } prost = { version = "0.11" } iox_query = { path = "../iox_query" } schema = { path = "../schema" } -sharder = { path = "../sharder" } iox_time = { path = "../iox_time" } trace_exporters = { path = "../trace_exporters" } trogging = { path = "../trogging", default-features = false, features = ["clap"] } diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index d4247297ff..3d8c135f87 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -83,7 +83,6 @@ async fn parquet_to_lp() { // Looks like: // { // "id": "1", - // "shardId": 1, // "namespaceId": 1, // "tableId": 1, // "partitionId": "1", diff --git a/ingester/README.md b/ingester/README.md index 0f73ea5562..7ad6da613d 100644 --- a/ingester/README.md +++ b/ingester/README.md @@ -2,19 +2,15 @@ ## Quick run -Set-up empty catalog db: +Set up empty catalog db: ```bash mkdir -p /tmp/iox/{wal,obj} createdb iox_shared ./target/debug/influxdb_iox catalog setup --catalog-dsn postgres:///iox_shared - -# there has to exist one "topic", see https://github.com/influxdata/influxdb_iox/issues/6420 -psql 'dbname=iox_shared options=-csearch_path=public,iox_catalog' -c "insert into topic (name) values ('iox-shared')" ``` - Run ingester: ```bash diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 884d224d01..3d1e0e467a 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -404,11 +404,11 @@ pub trait ColumnRepo: Send + Sync { async fn list(&mut self) -> Result>; } -/// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up -/// data within a namespace, which is different than Kafka partitions. +/// Functions for working with IOx partitions in the catalog. These are how IOx splits up +/// data within a namespace. #[async_trait] pub trait PartitionRepo: Send + Sync { - /// create or get a partition record for the given partition key, shard and table + /// create or get a partition record for the given partition key and table async fn create_or_get(&mut self, key: PartitionKey, table_id: TableId) -> Result; /// get partition by ID diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 930e84b1c2..a67c21f1e1 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -7,7 +7,6 @@ use crate::{ Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE, }, - kafkaless_transition::{Shard, SHARED_TOPIC_ID, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX}, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; @@ -66,7 +65,6 @@ struct MemCollections { namespaces: Vec, tables: Vec, columns: Vec, - shards: Vec, partitions: Vec, skipped_compactions: Vec, parquet_files: Vec, @@ -121,26 +119,6 @@ impl Display for MemCatalog { #[async_trait] impl Catalog for MemCatalog { async fn setup(&self) -> Result<(), Error> { - let guard = Arc::clone(&self.collections).lock_owned().await; - let stage = guard.clone(); - let mut transaction = MemTxn { - inner: MemTxnInner::Txn { - guard, - stage, - finalized: false, - }, - time_provider: self.time_provider(), - }; - let stage = transaction.stage(); - - // The transition shard must exist and must have magic ID and INDEX. - let shard = Shard { - id: TRANSITION_SHARD_ID, - topic_id: SHARED_TOPIC_ID, - shard_index: TRANSITION_SHARD_INDEX, - }; - stage.shards.push(shard); - transaction.commit_inplace().await?; Ok(()) } diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index 9b68dca297..213365cced 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -22,7 +22,6 @@ parquet_file = { path = "../parquet_file" } predicate = { path = "../predicate" } iox_query = { path = "../iox_query" } schema = { path = "../schema" } -sharder = { path = "../sharder" } uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } futures = "0.3.28" diff --git a/router/src/dml_handlers/rpc_write/balancer.rs b/router/src/dml_handlers/rpc_write/balancer.rs index 327bd3169b..fecf664722 100644 --- a/router/src/dml_handlers/rpc_write/balancer.rs +++ b/router/src/dml_handlers/rpc_write/balancer.rs @@ -31,7 +31,7 @@ const METRIC_EVAL_INTERVAL: Duration = Duration::from_secs(3); /// /// # Request Distribution /// -/// Requests are distributed uniformly across all shards **per thread**. Given +/// Requests are distributed uniformly across all endpoints **per thread**. Given /// enough requests (where `N` is significantly larger than the number of /// threads) an approximately uniform distribution is achieved. #[derive(Debug)]