fix: Rename ingester2 to ingester
parent
be6bcdef45
commit
56916cf942
|
@ -2679,7 +2679,7 @@ dependencies = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "ingester2"
|
||||
name = "ingester"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
|
@ -2748,7 +2748,7 @@ dependencies = [
|
|||
"generated_types",
|
||||
"hashbrown 0.13.2",
|
||||
"influxdb_iox_client",
|
||||
"ingester2",
|
||||
"ingester",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
"iox_time",
|
||||
|
@ -3081,7 +3081,7 @@ dependencies = [
|
|||
"futures",
|
||||
"generated_types",
|
||||
"hyper",
|
||||
"ingester2",
|
||||
"ingester",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
"ioxd_common",
|
||||
|
|
|
@ -29,7 +29,7 @@ members = [
|
|||
"influxdb2_client",
|
||||
"influxrpc_parser",
|
||||
"ingester2_test_ctx",
|
||||
"ingester2",
|
||||
"ingester",
|
||||
"iox_catalog",
|
||||
"iox_data_generator",
|
||||
"iox_query_influxql",
|
||||
|
|
|
@ -5,7 +5,7 @@ use std::path::PathBuf;
|
|||
/// CLI config for the ingester using the RPC write path
|
||||
#[derive(Debug, Clone, clap::Parser)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct Ingester2Config {
|
||||
pub struct IngesterConfig {
|
||||
/// Where this ingester instance should store its write-ahead log files. Each ingester instance
|
||||
/// must have its own directory.
|
||||
#[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)]
|
|
@ -15,7 +15,7 @@
|
|||
pub mod catalog_dsn;
|
||||
pub mod compactor2;
|
||||
pub mod garbage_collector;
|
||||
pub mod ingester2;
|
||||
pub mod ingester;
|
||||
pub mod ingester_address;
|
||||
pub mod object_store;
|
||||
pub mod querier;
|
||||
|
|
|
@ -108,7 +108,7 @@ INFLUXDB_IOX_MAX_HTTP_REQUEST_SIZE=100000000 \
|
|||
OBJECT_STORE=file \
|
||||
DATABASE_DIRECTORY=~/data_dir \
|
||||
LOG_FILTER=info \
|
||||
./target/release/influxdb_iox run ingester2
|
||||
./target/release/influxdb_iox run ingester
|
||||
```
|
||||
|
||||
## Run Router on port 8080/8081 (http/grpc)
|
||||
|
|
|
@ -4,7 +4,7 @@ option go_package = "github.com/influxdata/iox/ingester/v1";
|
|||
|
||||
import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto";
|
||||
|
||||
// A service provided by Ingester2 instances, called by Ingester Replicas.
|
||||
// A service provided by Ingester instances, called by Ingester Replicas.
|
||||
service PartitionBufferService {
|
||||
// Acquire the full in-memory state of the recipient ingester, returning the
|
||||
// data of all known partitions.
|
||||
|
@ -19,13 +19,13 @@ message GetPartitionBuffersRequest {}
|
|||
message GetPartitionBuffersResponse {
|
||||
// The unique, per-instance UUID of the ingester pushing this operation.
|
||||
string ingester_uuid = 1;
|
||||
|
||||
|
||||
// The catalog ID of the namespace this partition belongs to.
|
||||
int64 namespace_id = 2;
|
||||
|
||||
|
||||
// The catalog ID of the table this partition belongs to.
|
||||
int64 table_id = 3;
|
||||
|
||||
|
||||
// The catalog ID of this partition.
|
||||
int64 partition_id = 4;
|
||||
|
||||
|
@ -41,7 +41,7 @@ message GetPartitionBuffersResponse {
|
|||
}
|
||||
|
||||
// A service provided by Ingester Replica instances to accept pushed events from
|
||||
// an Ingester2 instance.
|
||||
// an Ingester instance.
|
||||
service ReplicationService {
|
||||
// Push the provided write request to the replica.
|
||||
rpc Replicate(ReplicateRequest) returns (ReplicateResponse);
|
||||
|
@ -70,10 +70,10 @@ message ReplicateResponse {}
|
|||
message PersistCompleteRequest {
|
||||
// The unique, per-instance UUID of the ingester pushing this operation.
|
||||
string ingester_uuid = 1;
|
||||
|
||||
|
||||
// The catalog ID of the namespace that has had data persisted.
|
||||
int64 namespace_id = 2;
|
||||
|
||||
|
||||
// The catalog ID of the table that has had data persisted.
|
||||
int64 table_id = 3;
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use super::main;
|
|||
use clap_blocks::{
|
||||
catalog_dsn::CatalogDsnConfig,
|
||||
compactor2::Compactor2Config,
|
||||
ingester2::Ingester2Config,
|
||||
ingester::IngesterConfig,
|
||||
ingester_address::IngesterAddress,
|
||||
object_store::{make_object_store, ObjectStoreConfig},
|
||||
querier::QuerierConfig,
|
||||
|
@ -456,7 +456,7 @@ impl Config {
|
|||
.clone()
|
||||
.with_grpc_bind_address(compactor_grpc_bind_address);
|
||||
|
||||
let ingester_config = Ingester2Config {
|
||||
let ingester_config = IngesterConfig {
|
||||
wal_directory,
|
||||
wal_rotation_period_seconds,
|
||||
concurrent_query_limit,
|
||||
|
@ -555,7 +555,7 @@ struct SpecializedConfig {
|
|||
compactor_run_config: RunConfig,
|
||||
|
||||
catalog_dsn: CatalogDsnConfig,
|
||||
ingester_config: Ingester2Config,
|
||||
ingester_config: IngesterConfig,
|
||||
router_config: Router2Config,
|
||||
compactor_config: Compactor2Config,
|
||||
querier_config: QuerierConfig,
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use super::main;
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
use clap_blocks::{
|
||||
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, object_store::make_object_store,
|
||||
catalog_dsn::CatalogDsnConfig, ingester::IngesterConfig, object_store::make_object_store,
|
||||
run_config::RunConfig,
|
||||
};
|
||||
use iox_query::exec::Executor;
|
||||
|
@ -32,7 +32,7 @@ pub enum Error {
|
|||
#[error("cannot parse object store config: {0}")]
|
||||
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
|
||||
|
||||
#[error("error initializing ingester2: {0}")]
|
||||
#[error("error initializing ingester: {0}")]
|
||||
Ingester(#[from] ioxd_ingester2::Error),
|
||||
|
||||
#[error("catalog DSN error: {0}")]
|
||||
|
@ -63,7 +63,7 @@ pub struct Config {
|
|||
pub(crate) catalog_dsn: CatalogDsnConfig,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub(crate) ingester_config: Ingester2Config,
|
||||
pub(crate) ingester_config: IngesterConfig,
|
||||
|
||||
/// Specify the size of the thread-pool for query execution, and the
|
||||
/// separate compaction thread-pool.
|
||||
|
@ -125,7 +125,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
info!("starting ingester2");
|
||||
info!("starting ingester");
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(main::main(common_state, services, metric_registry).await?)
|
|
@ -4,7 +4,7 @@ use trogging::cli::LoggingConfig;
|
|||
pub(crate) mod all_in_one;
|
||||
mod compactor2;
|
||||
mod garbage_collector;
|
||||
mod ingester2;
|
||||
mod ingester;
|
||||
mod main;
|
||||
mod querier;
|
||||
mod router2;
|
||||
|
@ -25,8 +25,8 @@ pub enum Error {
|
|||
#[snafu(display("Error in router2 subcommand: {}", source))]
|
||||
Router2Error { source: router2::Error },
|
||||
|
||||
#[snafu(display("Error in ingester2 subcommand: {}", source))]
|
||||
Ingester2Error { source: ingester2::Error },
|
||||
#[snafu(display("Error in ingester subcommand: {}", source))]
|
||||
IngesterError { source: ingester::Error },
|
||||
|
||||
#[snafu(display("Error in all in one subcommand: {}", source))]
|
||||
AllInOneError { source: all_in_one::Error },
|
||||
|
@ -55,7 +55,7 @@ impl Config {
|
|||
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
|
||||
Some(Command::Querier(config)) => config.run_config.logging_config(),
|
||||
Some(Command::Router2(config)) => config.run_config.logging_config(),
|
||||
Some(Command::Ingester2(config)) => config.run_config.logging_config(),
|
||||
Some(Command::Ingester(config)) => config.run_config.logging_config(),
|
||||
Some(Command::AllInOne(config)) => &config.logging_config,
|
||||
Some(Command::Test(config)) => config.run_config.logging_config(),
|
||||
}
|
||||
|
@ -73,8 +73,8 @@ enum Command {
|
|||
/// Run the server in router2 mode
|
||||
Router2(router2::Config),
|
||||
|
||||
/// Run the server in ingester2 mode
|
||||
Ingester2(ingester2::Config),
|
||||
/// Run the server in ingester mode
|
||||
Ingester(ingester::Config),
|
||||
|
||||
/// Run the server in "all in one" mode (Default)
|
||||
AllInOne(all_in_one::Config),
|
||||
|
@ -99,9 +99,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
.context(GarbageCollectorSnafu),
|
||||
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
|
||||
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
|
||||
Some(Command::Ingester2(config)) => {
|
||||
ingester2::command(config).await.context(Ingester2Snafu)
|
||||
}
|
||||
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
|
||||
Some(Command::AllInOne(config)) => all_in_one::command(config).await.context(AllInOneSnafu),
|
||||
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ async fn shard_id_greater_than_num_shards_is_invalid() {
|
|||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||
|
@ -95,7 +95,7 @@ async fn sharded_compactor_0_always_compacts_partition_1() {
|
|||
|
||||
// The test below assumes a specific partition id, and it needs to customize the compactor
|
||||
// config, so use a non-shared minicluster here.
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||
|
@ -178,7 +178,7 @@ async fn sharded_compactor_1_never_compacts_partition_1() {
|
|||
|
||||
// The test below assumes a specific partition id, and it needs to customize the compactor
|
||||
// config, so use a non-shared minicluster here.
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||
let compactor_config = TestConfig::new_compactor2(&ingester_config).with_compactor_shards(
|
||||
|
|
|
@ -12,7 +12,7 @@ async fn querier_namespace_client() {
|
|||
|
||||
let table_name = "the_table";
|
||||
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config);
|
||||
|
||||
|
|
|
@ -125,10 +125,10 @@ async fn basic_empty() {
|
|||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
// specially create a querier2 config that is NOT connected to the ingester2
|
||||
let querier_config = TestConfig::new_querier2_without_ingester2(&ingester_config);
|
||||
// specially create a querier2 config that is NOT connected to the ingester
|
||||
let querier_config = TestConfig::new_querier2_without_ingester(&ingester_config);
|
||||
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_ingester(ingester_config)
|
||||
|
@ -196,10 +196,10 @@ async fn basic_no_ingester_connection() {
|
|||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
// specially create a querier2 config that is NOT connected to the ingester2
|
||||
let querier_config = TestConfig::new_querier2_without_ingester2(&ingester_config);
|
||||
// specially create a querier2 config that is NOT connected to the ingester
|
||||
let querier_config = TestConfig::new_querier2_without_ingester(&ingester_config);
|
||||
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_ingester(ingester_config)
|
||||
|
@ -636,7 +636,7 @@ async fn oom_protection() {
|
|||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let ingester_config = TestConfig::new_ingester2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&database_url);
|
||||
let router_config = TestConfig::new_router2(&ingester_config);
|
||||
let querier_config = TestConfig::new_querier2(&ingester_config).with_querier_mem_pool_bytes(1);
|
||||
let mut cluster = MiniCluster::new()
|
||||
|
|
|
@ -12,7 +12,7 @@ async fn basic_multi_ingesters() {
|
|||
let database_url = maybe_skip_integration!();
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let ingester1_config = TestConfig::new_ingester2_never_persist(&database_url);
|
||||
let ingester1_config = TestConfig::new_ingester_never_persist(&database_url);
|
||||
let ingester2_config = TestConfig::another_ingester(&ingester1_config);
|
||||
let ingester_configs = [ingester1_config, ingester2_config];
|
||||
|
||||
|
@ -95,7 +95,7 @@ async fn write_replication() {
|
|||
|
||||
let table_name = "some_table";
|
||||
|
||||
let ingester1_config = TestConfig::new_ingester2_never_persist(&database_url);
|
||||
let ingester1_config = TestConfig::new_ingester_never_persist(&database_url);
|
||||
let ingester2_config = TestConfig::another_ingester(&ingester1_config);
|
||||
let ingester_configs = [ingester1_config, ingester2_config];
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[package]
|
||||
name = "ingester2"
|
||||
name = "ingester"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
|
@ -1,4 +1,4 @@
|
|||
# ingester2
|
||||
# ingester
|
||||
|
||||
## Quick run
|
||||
|
||||
|
@ -15,10 +15,10 @@ psql 'dbname=iox_shared options=-csearch_path=public,iox_catalog' -c "insert int
|
|||
```
|
||||
|
||||
|
||||
Run ingester2:
|
||||
Run ingester:
|
||||
|
||||
```bash
|
||||
./target/debug/influxdb_iox run ingester2 --api-bind=127.0.0.1:8081 --grpc-bind=127.0.0.1:8042 --wal-directory /tmp/iox/wal --catalog-dsn postgres:///iox_shared --object-store=file --data-dir=/tmp/iox/obj -v
|
||||
./target/debug/influxdb_iox run ingester --api-bind=127.0.0.1:8081 --grpc-bind=127.0.0.1:8042 --wal-directory /tmp/iox/wal --catalog-dsn postgres:///iox_shared --object-store=file --data-dir=/tmp/iox/obj -v
|
||||
```
|
||||
|
||||
Run router2:
|
|
@ -1,9 +1,9 @@
|
|||
## `ingester2` benchmarks
|
||||
## `ingester` benchmarks
|
||||
|
||||
Run them like this:
|
||||
|
||||
```console
|
||||
% cargo bench -p ingester2 --features=benches
|
||||
% cargo bench -p ingester --features=benches
|
||||
```
|
||||
|
||||
This is required to mark internal types as `pub`, allowing the benchmarks to
|
|
@ -7,7 +7,7 @@ use dml::{DmlMeta, DmlOperation, DmlWrite};
|
|||
use generated_types::influxdata::{
|
||||
iox::wal::v1::sequenced_wal_op::Op as WalOp, pbdata::v1::DatabaseBatch,
|
||||
};
|
||||
use ingester2::{
|
||||
use ingester::{
|
||||
buffer_tree::benches::PartitionData,
|
||||
dml_sink::{DmlError, DmlSink},
|
||||
};
|
||||
|
@ -61,10 +61,10 @@ fn wal_replay_bench(c: &mut Criterion) {
|
|||
// overhead.
|
||||
let sink = NopSink::default();
|
||||
|
||||
let persist = ingester2::persist::queue::benches::MockPersistQueue::default();
|
||||
let persist = ingester::persist::queue::benches::MockPersistQueue::default();
|
||||
|
||||
// Replay the wal into the NOP.
|
||||
ingester2::benches::replay(
|
||||
ingester::benches::replay(
|
||||
&wal,
|
||||
&sink,
|
||||
Arc::new(persist),
|
||||
|
@ -117,7 +117,7 @@ impl DmlSink for NopSink {
|
|||
}
|
||||
}
|
||||
|
||||
impl ingester2::partition_iter::PartitionIter for NopSink {
|
||||
impl ingester::partition_iter::PartitionIter for NopSink {
|
||||
fn partition_iter(
|
||||
&self,
|
||||
) -> Box<dyn Iterator<Item = std::sync::Arc<parking_lot::Mutex<PartitionData>>> + Send> {
|
|
@ -8,7 +8,7 @@ use generated_types::influxdata::{
|
|||
iox::ingester::v1::write_service_server::WriteService, pbdata::v1::DatabaseBatch,
|
||||
};
|
||||
use influxdb_iox_client::ingester::generated_types::WriteRequest;
|
||||
use ingester2::IngesterRpcInterface;
|
||||
use ingester::IngesterRpcInterface;
|
||||
use ingester2_test_ctx::{TestContext, TestContextBuilder};
|
||||
use iox_time::TimeProvider;
|
||||
use mutable_batch_lp::lines_to_batches;
|
|
@ -87,7 +87,7 @@ pub trait IngesterRpcInterface: Send + Sync + std::fmt::Debug {
|
|||
fn query_service(&self, max_simultaneous_requests: usize) -> Self::FlightHandler;
|
||||
}
|
||||
|
||||
/// A RAII guard to clean up `ingester2` instance resources when dropped.
|
||||
/// A RAII guard to clean up `ingester` instance resources when dropped.
|
||||
#[must_use = "ingester stops when guard is dropped"]
|
||||
#[derive(Debug)]
|
||||
pub struct IngesterGuard<T> {
|
||||
|
@ -128,7 +128,7 @@ impl<T> Drop for IngesterGuard<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Errors that occur during initialisation of an `ingester2` instance.
|
||||
/// Errors that occur during initialisation of an `ingester` instance.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum InitError {
|
||||
/// A catalog error occurred while fetching the most recent partitions for
|
||||
|
@ -145,14 +145,14 @@ pub enum InitError {
|
|||
WalReplay(Box<dyn std::error::Error>),
|
||||
}
|
||||
|
||||
/// Initialise a new `ingester2` instance, returning the gRPC service handler
|
||||
/// Initialise a new `ingester` instance, returning the gRPC service handler
|
||||
/// implementations to be bound by the caller.
|
||||
///
|
||||
/// ## WAL Replay
|
||||
///
|
||||
/// Writes through an `ingester2` instance commit to a durable write-ahead log.
|
||||
/// Writes through an `ingester` instance commit to a durable write-ahead log.
|
||||
///
|
||||
/// During initialisation of an `ingester2` instance, any files in
|
||||
/// During initialisation of an `ingester` instance, any files in
|
||||
/// `wal_directory` are read assuming they are redo log files from the
|
||||
/// write-ahead log.
|
||||
///
|
|
@ -149,7 +149,7 @@
|
|||
//!
|
||||
//! ## Write Reordering
|
||||
//!
|
||||
//! A write that enters an `ingester2` instance can be reordered arbitrarily
|
||||
//! A write that enters an `ingester` instance can be reordered arbitrarily
|
||||
//! with concurrent write requests.
|
||||
//!
|
||||
//! For example, two gRPC writes can race to be committed to the [`wal`], and
|
|
@ -203,7 +203,7 @@ impl Context {
|
|||
guard.update_sort_key(Some(new_sort_key.clone()));
|
||||
};
|
||||
|
||||
// Assert the internal (to this ingester2 instance) serialisation of
|
||||
// Assert the internal (to this ingester instance) serialisation of
|
||||
// sort key updates.
|
||||
//
|
||||
// Both of these get() should not block due to both of the values having
|
|
@ -17,7 +17,7 @@ futures = "0.3.28"
|
|||
generated_types = { version = "0.1.0", path = "../generated_types" }
|
||||
hashbrown.workspace = true
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
||||
ingester2 = { path = "../ingester2" }
|
||||
ingester = { path = "../ingester" }
|
||||
iox_catalog = { version = "0.1.0", path = "../iox_catalog" }
|
||||
iox_query = { version = "0.1.0", path = "../iox_query" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
|
|
|
@ -25,7 +25,7 @@ use generated_types::influxdata::iox::ingester::v1::{
|
|||
write_service_server::WriteService, WriteRequest,
|
||||
};
|
||||
use influxdb_iox_client::flight;
|
||||
use ingester2::{IngesterGuard, IngesterRpcInterface};
|
||||
use ingester::{IngesterGuard, IngesterRpcInterface};
|
||||
use iox_catalog::{
|
||||
interface::{Catalog, SoftDeletedRows},
|
||||
validate_or_insert_schema,
|
||||
|
@ -49,12 +49,12 @@ pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5;
|
|||
/// [`TestContextBuilder::with_persist_hot_partition_cost()`].
|
||||
pub const DEFAULT_PERSIST_HOT_PARTITION_COST: usize = 20_000_000;
|
||||
|
||||
/// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester2`] instance.
|
||||
/// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester`] instance.
|
||||
pub fn test_context() -> TestContextBuilder {
|
||||
TestContextBuilder::default()
|
||||
}
|
||||
|
||||
/// Configure and construct a [`TestContext`] containing an [`ingester2`] instance.
|
||||
/// Configure and construct a [`TestContext`] containing an [`ingester`] instance.
|
||||
#[derive(Debug)]
|
||||
pub struct TestContextBuilder {
|
||||
wal_dir: Option<Arc<TempDir>>,
|
||||
|
@ -105,7 +105,7 @@ impl TestContextBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
/// Initialise the [`ingester2`] instance and return a [`TestContext`] for it.
|
||||
/// Initialise the [`ingester`] instance and return a [`TestContext`] for it.
|
||||
pub async fn build(self) -> TestContext<impl IngesterRpcInterface> {
|
||||
let Self {
|
||||
wal_dir,
|
||||
|
@ -136,7 +136,7 @@ impl TestContextBuilder {
|
|||
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||
|
||||
let ingester = ingester2::new(
|
||||
let ingester = ingester::new(
|
||||
Arc::clone(&catalog),
|
||||
Arc::clone(&metrics),
|
||||
persist_background_fetch_time,
|
||||
|
@ -164,9 +164,9 @@ impl TestContextBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
/// A command interface to the underlying [`ingester2`] instance.
|
||||
/// A command interface to the underlying [`ingester`] instance.
|
||||
///
|
||||
/// When the [`TestContext`] is dropped, the underlying [`ingester2`] instance
|
||||
/// When the [`TestContext`] is dropped, the underlying [`ingester`] instance
|
||||
/// it controls is (ungracefully) stopped.
|
||||
#[derive(Debug)]
|
||||
pub struct TestContext<T> {
|
||||
|
|
|
@ -12,7 +12,7 @@ clap_blocks = { path = "../clap_blocks" }
|
|||
futures = "0.3.28"
|
||||
generated_types = { path = "../generated_types" }
|
||||
hyper = "0.14"
|
||||
ingester2 = { path = "../ingester2" }
|
||||
ingester = { path = "../ingester" }
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_query = { version = "0.1.0", path = "../iox_query" }
|
||||
ioxd_common = { path = "../ioxd_common" }
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use arrow_flight::flight_service_server::FlightServiceServer;
|
||||
use async_trait::async_trait;
|
||||
use clap_blocks::ingester2::Ingester2Config;
|
||||
use clap_blocks::ingester::IngesterConfig;
|
||||
use futures::FutureExt;
|
||||
use generated_types::influxdata::iox::{
|
||||
catalog::v1::catalog_service_server::CatalogServiceServer,
|
||||
|
@ -9,7 +9,7 @@ use generated_types::influxdata::iox::{
|
|||
},
|
||||
};
|
||||
use hyper::{Body, Request, Response};
|
||||
use ingester2::{IngesterGuard, IngesterRpcInterface};
|
||||
use ingester::{IngesterGuard, IngesterRpcInterface};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use ioxd_common::{
|
||||
|
@ -39,8 +39,8 @@ const MAX_OUTGOING_MSG_BYTES: usize = 1024 * 1024; // 1 MiB
|
|||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("error initializing ingester2: {0}")]
|
||||
Ingester(#[from] ingester2::InitError),
|
||||
#[error("error initializing ingester: {0}")]
|
||||
Ingester(#[from] ingester::InitError),
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -76,7 +76,7 @@ impl<I: IngesterRpcInterface> IngesterServerType<I> {
|
|||
|
||||
impl<I: IngesterRpcInterface> std::fmt::Debug for IngesterServerType<I> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Ingester2")
|
||||
write!(f, "Ingester")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,7 @@ impl<I: IngesterRpcInterface> std::fmt::Debug for IngesterServerType<I> {
|
|||
impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for IngesterServerType<I> {
|
||||
/// Human name for this server type
|
||||
fn name(&self) -> &str {
|
||||
"ingester2"
|
||||
"ingester"
|
||||
}
|
||||
|
||||
/// Return the [`metric::Registry`] used by the ingester.
|
||||
|
@ -188,13 +188,13 @@ pub async fn create_ingester_server_type(
|
|||
common_state: &CommonServerState,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
metrics: Arc<Registry>,
|
||||
ingester_config: &Ingester2Config,
|
||||
ingester_config: &IngesterConfig,
|
||||
exec: Arc<Executor>,
|
||||
object_store: ParquetStorage,
|
||||
) -> Result<Arc<dyn ServerType>> {
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||
|
||||
let grpc = ingester2::new(
|
||||
let grpc = ingester::new(
|
||||
catalog,
|
||||
Arc::clone(&metrics),
|
||||
PERSIST_BACKGROUND_FETCH_TIME,
|
||||
|
|
|
@ -364,7 +364,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester2_uuid_file_counts() {
|
||||
async fn ingester_uuid_file_counts() {
|
||||
let (catalog, table, _partition) = make_catalog().await;
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_id = table.table.id;
|
||||
|
|
|
@ -1359,7 +1359,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester2_uuid_completed_persistence_count() {
|
||||
async fn ingester_uuid_completed_persistence_count() {
|
||||
let ingester_uuid1 = Uuid::new_v4();
|
||||
let ingester_uuid2 = Uuid::new_v4();
|
||||
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue