fix: Remove old querier

pull/24376/head
Carol (Nichols || Goulding) 2023-02-10 16:33:09 -05:00
parent fb6b3f66da
commit acf857816e
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
19 changed files with 188 additions and 1991 deletions

View File

@ -1,50 +1,7 @@
//! Querier-related configs.
use crate::ingester_address::IngesterAddress;
use data_types::{IngesterMapping, ShardIndex};
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::{
collections::HashMap, fs, io, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc,
};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Could not read shard to ingester file `{}`: {source}", file.display()))]
ShardToIngesterFileReading { source: io::Error, file: PathBuf },
#[snafu(display("Could not deserialize JSON from ingester config: {source}"))]
ShardToIngesterDeserializing { source: serde_json::Error },
#[snafu(display(
"Specifying `\"ignoreAll\": true` requires that both the `ingesters` and \
`shards` configurations are empty. `ingesters`: `{:#?}`, `shards`: `{:#?}`",
ingesters,
shards,
))]
IgnoreAllRequiresEmptyConfig {
ingesters: HashMap<Arc<str>, Arc<IngesterConfig>>,
shards: HashMap<ShardIndex, ShardConfig>,
},
#[snafu(display(
"Ingester `{name}` must either set the `addr` to a non-empty value or set `ignore` to true"
))]
IngesterAddrRequired { name: Arc<str> },
#[snafu(display(
"Could not find ingester `{name}` specified for shard index `{shard_index}`"
))]
IngesterNotFound {
shard_index: ShardIndex,
name: Arc<str>,
},
#[snafu(context(false))]
IngesterAddress {
source: crate::ingester_address::Error,
},
}
use std::num::NonZeroUsize;
/// CLI config for querier configuration
#[derive(Debug, Clone, PartialEq, Eq, clap::Parser)]
@ -71,144 +28,6 @@ pub struct QuerierConfig {
)]
pub exec_mem_pool_bytes: usize,
/// Path to a JSON file containing a Shard index to ingesters gRPC mapping. For example:
///
/// ```json
/// {
/// // Flag to ignore all ingesters and only query persisted data. Useful for development
/// // or creating "cold data only" clusters.
/// //
/// // If this is set to `true`, having non-empty `ingesters` or `shards` is a startup
/// // error.
/// //
/// // default: false
/// "ignoreAll": false,
///
/// // Mapping of ingester name to config.
/// //
/// // default: {}
/// "ingesters": {
/// "i1": {
/// // Ingester address as URL.
/// //
/// // If this is `null` but `ignore` is false, it is an error.
/// //
/// // default: null
/// "addr": "http://ingester-1:1234"
/// },
/// "i2": {
/// // Flag to ignore this ingester at query time and not contact it.
/// //
/// // default: false
/// "ignore": true
/// }
/// },
///
/// // Mapping of shard indexes (as strings) to ingester names. Queries to shards that do
/// // not appear in this mapping will return an error. Using an ingester name in the
/// // `shards` mapping that does not appear in the `ingesters` mapping is a startup error.
/// //
/// // default: {}
/// "shards": {
/// "1": {
/// // Name of an ingester from the `ingester` mapping.
/// //
/// // If this is `null`, queries to this shard will error.
/// //
/// // default: null
/// "ingester": "i1"
/// },
/// "2": {
/// "ingester": "i1"
/// },
/// "3": {
/// "ingester": "i2"
/// },
/// "5": {
/// // Flag to not fetch data from any ingester for queries to this shard.
/// //
/// // default: false
/// "ignore": true
/// }
/// }
/// }
/// ```
#[clap(
long = "shard-to-ingesters-file",
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS_FILE",
action
)]
pub shard_to_ingesters_file: Option<PathBuf>,
/// JSON containing a Shard index to ingesters gRPC mapping. For example:
///
/// ```json
/// {
/// // Flag to ignore all ingesters and only query persisted data. Useful for development
/// // or creating "cold data only" clusters.
/// //
/// // If this is set to `true`, having non-empty `ingesters` or `shards` is a startup
/// // error.
/// //
/// // default: false
/// "ignoreAll": false,
///
/// // Mapping of ingester name to config.
/// //
/// // default: {}
/// "ingesters": {
/// "i1": {
/// // Ingester address as URL.
/// //
/// // If this is `null` but `ignore` is false, it is an error.
/// //
/// // default: null
/// "addr": "http://ingester-1:1234"
/// },
/// "i2": {
/// // Flag to ignore this ingester at query time and not contact it.
/// //
/// // default: false
/// "ignore": true
/// }
/// },
///
/// // Mapping of shard indexes (as strings) to ingester names. Queries to shards that do
/// // not appear in this mapping will return an error. Using an ingester name in the
/// // `shards` mapping that does not appear in the `ingesters` mapping is a startup error.
/// //
/// // default: {}
/// "shards": {
/// "1": {
/// // Name of an ingester from the `ingester` mapping.
/// //
/// // If this is `null`, queries to this shard will error.
/// //
/// // default: null
/// "ingester": "i1"
/// },
/// "2": {
/// "ingester": "i1"
/// },
/// "3": {
/// "ingester": "i2"
/// },
/// "5": {
/// // Flag to not fetch data from any ingester for queries to this shard.
/// //
/// // default: false
/// "ignore": true
/// }
/// }
/// }
/// ```
#[clap(
long = "shard-to-ingesters",
env = "INFLUXDB_IOX_SHARD_TO_INGESTERS",
action
)]
pub shard_to_ingesters: Option<String>,
/// gRPC address for the router to talk with the ingesters. For
/// example:
///
@ -219,8 +38,14 @@ pub struct QuerierConfig {
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
///
/// for multiple addresses.
#[clap(long = "ingester-addresses", env = "INFLUXDB_IOX_INGESTER_ADDRESSES", num_args=1.., value_delimiter = ',')]
pub ingester_addresses: Vec<String>,
#[clap(
long = "ingester-addresses",
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
required = false,
num_args = 0..,
value_delimiter = ','
)]
pub ingester_addresses: Vec<IngesterAddress>,
/// Size of the RAM cache used to store catalog metadata information in bytes.
#[clap(
@ -256,11 +81,12 @@ pub struct QuerierConfig {
/// returning results that do not include unpersisted data and enter "circuit breaker mode"
/// to avoid continually retrying the failing connection on subsequent queries.
///
/// If circuits are open, the querier will NOT contact the ingester and no unpersisted data will be presented to the user.
/// If circuits are open, the querier will NOT contact the ingester and no unpersisted data
/// will be presented to the user.
///
/// Circuits will switch to "half open" after some jittered timeout and the querier will try to use the ingester in
/// question again. If this succeeds, we are back to normal, otherwise it will back off exponentially before trying
/// again (and again ...).
/// Circuits will switch to "half open" after some jittered timeout and the querier will try to
/// use the ingester in question again. If this succeeds, we are back to normal, otherwise it
/// will back off exponentially before trying again (and again ...).
///
/// In a production environment the `ingester_circuit_state` metric should be monitored.
#[clap(
@ -279,46 +105,6 @@ impl QuerierConfig {
self.num_query_threads
}
/// Return the querier config's ingester addresses. If `--shard-to-ingesters-file` is used to
/// specify a JSON file containing shard to ingester address mappings, this returns `Err` if
/// there are any problems reading, deserializing, or interpreting the file.
// When we have switched to using the RPC write path only, this method can be changed to be
// infallible as clap will handle failure to parse the list of strings.
//
// Switching into the RPC write path mode requires *both* the `INFLUXDB_IOX_RPC_MODE`
// environment variable to be specified *and* `--ingester-addresses` to be set in order to
// switch. Setting `INFLUXDB_IOX_RPC_MODE` and shard-to-ingesters mapping, or not setting
// `INFLUXDB_IOX_RPC_MODE` and setting ingester addresses, will panic.
pub fn ingester_addresses(&self) -> Result<IngesterAddresses, Error> {
if let Some(file) = &self.shard_to_ingesters_file {
let contents =
fs::read_to_string(file).context(ShardToIngesterFileReadingSnafu { file })?;
let map = deserialize_shard_ingester_map(&contents)?;
if map.is_empty() {
Ok(IngesterAddresses::None)
} else {
Ok(IngesterAddresses::ByShardIndex(map))
}
} else if let Some(contents) = &self.shard_to_ingesters {
let map = deserialize_shard_ingester_map(contents)?;
if map.is_empty() {
Ok(IngesterAddresses::None)
} else {
Ok(IngesterAddresses::ByShardIndex(map))
}
} else if !self.ingester_addresses.is_empty() {
Ok(IngesterAddresses::List(
self.ingester_addresses
.iter()
.map(|addr| IngesterAddress::from_str(addr))
.collect::<Result<Vec<_>, _>>()?,
))
} else {
Ok(IngesterAddresses::None)
}
}
/// Size of the RAM cache pool for metadata in bytes.
pub fn ram_pool_metadata_bytes(&self) -> usize {
self.ram_pool_metadata_bytes
@ -335,131 +121,18 @@ impl QuerierConfig {
}
}
fn deserialize_shard_ingester_map(
contents: &str,
) -> Result<HashMap<ShardIndex, IngesterMapping>, Error> {
let ingesters_config: IngestersConfig =
serde_json::from_str(contents).context(ShardToIngesterDeserializingSnafu)?;
if ingesters_config.ignore_all
&& (!ingesters_config.ingesters.is_empty() || !ingesters_config.shards.is_empty())
{
return IgnoreAllRequiresEmptyConfigSnafu {
ingesters: ingesters_config.ingesters,
shards: ingesters_config.shards,
}
.fail();
}
let mut ingester_mapping_by_name = HashMap::new();
for (name, config) in &ingesters_config.ingesters {
match (config.ignore, config.addr.as_ref()) {
(true, _) => {
ingester_mapping_by_name.insert(name, IngesterMapping::Ignore);
}
(false, None) => {
return IngesterAddrRequiredSnafu {
name: Arc::clone(name),
}
.fail();
}
(false, Some(addr)) if addr.is_empty() => {
return IngesterAddrRequiredSnafu {
name: Arc::clone(name),
}
.fail();
}
(false, Some(addr)) => {
ingester_mapping_by_name.insert(name, IngesterMapping::Addr(Arc::clone(addr)));
}
}
}
let mut map = HashMap::new();
for (shard_index, shard_config) in ingesters_config.shards {
if shard_config.ignore {
map.insert(shard_index, IngesterMapping::Ignore);
continue;
}
match shard_config.ingester {
Some(ingester) => match ingester_mapping_by_name.get(&ingester) {
Some(ingester_mapping) => {
map.insert(shard_index, ingester_mapping.clone());
}
None => {
return IngesterNotFoundSnafu {
name: Arc::clone(&ingester),
shard_index,
}
.fail();
}
},
None => {
map.insert(shard_index, IngesterMapping::NotMapped);
}
}
}
Ok(map)
}
/// Ingester addresses.
#[derive(Debug, PartialEq, Eq)]
pub enum IngesterAddresses {
/// A mapping from shard index to ingesters.
ByShardIndex(HashMap<ShardIndex, IngesterMapping>),
/// A list of ingester2 addresses.
List(Vec<IngesterAddress>),
/// No connections, meaning only persisted data should be used.
None,
}
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct IngestersConfig {
#[serde(default)]
ignore_all: bool,
#[serde(default)]
ingesters: HashMap<Arc<str>, Arc<IngesterConfig>>,
#[serde(default)]
shards: HashMap<ShardIndex, ShardConfig>,
}
/// Ingester config.
#[derive(Debug, Deserialize)]
pub struct IngesterConfig {
addr: Option<Arc<str>>,
#[serde(default)]
ignore: bool,
}
/// Shard config.
#[derive(Debug, Deserialize)]
pub struct ShardConfig {
ingester: Option<Arc<str>>,
#[serde(default)]
ignore: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
use test_helpers::assert_error;
use test_helpers::assert_contains;
#[test]
fn test_default() {
let actual = QuerierConfig::try_parse_from(["my_binary"]).unwrap();
assert_eq!(actual.num_query_threads(), None);
assert!(matches!(
actual.ingester_addresses().unwrap(),
IngesterAddresses::None,
));
assert!(actual.ingester_addresses.is_empty());
}
#[test]
@ -471,26 +144,25 @@ mod tests {
actual.num_query_threads(),
Some(NonZeroUsize::new(42).unwrap())
);
assert!(matches!(
actual.ingester_addresses().unwrap(),
IngesterAddresses::None,
));
}
#[test]
fn test_ingester_addresses_list() {
let actual = QuerierConfig::try_parse_from([
let querier = QuerierConfig::try_parse_from([
"my_binary",
"--ingester-addresses",
"http://ingester-0:8082,http://ingester-1:8082",
])
.unwrap();
let expected = IngesterAddresses::List(vec![
IngesterAddress::from_str("http://ingester-0:8082").unwrap(),
IngesterAddress::from_str("http://ingester-1:8082").unwrap(),
]);
assert_eq!(actual.ingester_addresses().unwrap(), expected);
let actual: Vec<_> = querier
.ingester_addresses
.iter()
.map(ToString::to_string)
.collect();
let expected = vec!["http://ingester-0:8082/", "http://ingester-1:8082/"];
assert_eq!(actual, expected);
}
#[test]
@ -500,285 +172,15 @@ mod tests {
"--ingester-addresses",
"\\ingester-0:8082",
])
.unwrap()
.ingester_addresses();
assert_error!(actual, Error::IngesterAddress { .. });
}
.unwrap_err()
.to_string();
#[test]
fn supply_json_value() {
let actual = QuerierConfig::try_parse_from([
"my_binary",
"--shard-to-ingesters",
r#"{
"ignoreAll": false,
"ingesters": {
"i1": {
"addr": "http://ingester-1:1234"
},
"i2": {
"ignore": true
},
"i3": {
"ignore": true,
"addr": "http://ingester-2:2345"
}
},
"shards": {
"1": {
"ingester": "i1"
},
"2": {
"ingester": "i2"
},
"5": {
"ignore": true
}
}
}"#,
])
.unwrap();
let expected = IngesterAddresses::ByShardIndex(
[
(
ShardIndex::new(1),
IngesterMapping::Addr("http://ingester-1:1234".into()),
),
(ShardIndex::new(2), IngesterMapping::Ignore),
(ShardIndex::new(5), IngesterMapping::Ignore),
]
.into_iter()
.collect(),
assert_contains!(
actual,
"error: \
invalid value '\\ingester-0:8082' \
for '--ingester-addresses [<INGESTER_ADDRESSES>...]': \
Invalid: invalid uri character"
);
assert_eq!(actual.ingester_addresses().unwrap(), expected);
}
#[test]
fn successful_deserialization() {
let contents = r#"{
"ignoreAll": false,
"ingesters": {
"i1": {
"addr": "http://ingester-1:1234"
},
"i2": {
"ignore": true
},
"i3": {
"ignore": true,
"addr": "http://ingester-2:2345"
}
},
"shards": {
"1": {
"ingester": "i1"
},
"2": {
"ingester": "i2"
},
"3": {
"ingester": "i1",
"ignore": true
},
"5": {
"ignore": true
}
}
}"#;
let map = deserialize_shard_ingester_map(contents).unwrap();
let expected = [
(
ShardIndex::new(1),
IngesterMapping::Addr("http://ingester-1:1234".into()),
),
(ShardIndex::new(2), IngesterMapping::Ignore),
(ShardIndex::new(3), IngesterMapping::Ignore),
(ShardIndex::new(5), IngesterMapping::Ignore),
]
.into_iter()
.collect();
assert_eq!(map, expected);
}
#[test]
fn unsuccessful_deserialization() {
let map = deserialize_shard_ingester_map("");
assert_error!(map, Error::ShardToIngesterDeserializing { .. });
}
#[test]
fn ignore_all_requires_empty_maps() {
let expected = HashMap::new();
let map = deserialize_shard_ingester_map(
r#"{
"ignoreAll": true
}"#,
);
assert_eq!(map.unwrap(), expected);
let map = deserialize_shard_ingester_map(
r#"{
"ignoreAll": true,
"ingesters": {},
"shards": {}
}"#,
);
assert_eq!(map.unwrap(), expected);
let map = deserialize_shard_ingester_map(
r#"{
"ignoreAll": true,
"ingesters": {
"i1": {
"addr": "http://ingester-1:1234"
}
},
"shards": {}
}"#,
);
assert_error!(map, Error::IgnoreAllRequiresEmptyConfig { .. });
let map = deserialize_shard_ingester_map(
r#"{
"ignoreAll": true,
"ingesters": {},
"shards": {
"1": {
"ingester": "i1"
}
}
}"#,
);
assert_error!(map, Error::IgnoreAllRequiresEmptyConfig { .. });
let map = deserialize_shard_ingester_map(
r#"{
"ignoreAll": true,
"ingesters": {
"i1": {
"addr": "http://ingester-1:1234"
}
},
"shards": {
"1": {
"ingester": "i1"
}
}
}"#,
);
assert_error!(map, Error::IgnoreAllRequiresEmptyConfig { .. });
}
#[test]
fn ingester_addr_must_be_specified_if_not_ignored() {
let map = deserialize_shard_ingester_map(
r#"{
"ingesters": {
"i1": {}
}
}"#,
);
assert_error!(map, Error::IngesterAddrRequired { ref name } if name.as_ref() == "i1");
let map = deserialize_shard_ingester_map(
r#"{
"ingesters": {
"i1": {
"addr": ""
}
}
}"#,
);
assert_error!(map, Error::IngesterAddrRequired { ref name } if name.as_ref() == "i1");
}
#[test]
fn ingester_must_be_found() {
let map = deserialize_shard_ingester_map(
r#"{
"ingesters": {},
"shards": {
"1": {
"ingester": "i1"
}
}
}"#,
);
assert_error!(
map,
Error::IngesterNotFound { shard_index, ref name }
if shard_index.get() == 1 && name.as_ref() == "i1"
);
let map = deserialize_shard_ingester_map(
r#"{
"ingesters": {},
"shards": {
"1": {
"ingester": ""
}
}
}"#,
);
assert_error!(
map,
Error::IngesterNotFound { shard_index, ref name }
if shard_index.get() == 1 && name.as_ref() == ""
);
}
#[test]
fn shard_to_ingester_varieties() {
let map = deserialize_shard_ingester_map(
r#"{
"ingesters": {
"i1": {
"addr": "http://ingester-1:1234"
}
},
"shards": {
"1": {
"ingester": "i1"
},
"2": {},
"3": {
"ingester": null
},
"4": {
"ignore": true
},
"5": {
"ignore": true,
"ingester": "i1"
},
"6": {
"ignore": true,
"ingester": null
}
}
}"#,
);
let expected = [
(
ShardIndex::new(1),
IngesterMapping::Addr("http://ingester-1:1234".into()),
),
(ShardIndex::new(2), IngesterMapping::NotMapped),
(ShardIndex::new(3), IngesterMapping::NotMapped),
(ShardIndex::new(4), IngesterMapping::Ignore),
(ShardIndex::new(5), IngesterMapping::Ignore),
(ShardIndex::new(6), IngesterMapping::Ignore),
]
.into_iter()
.collect();
assert_eq!(map.unwrap(), expected);
}
}

View File

@ -270,19 +270,6 @@ impl std::str::FromStr for ShardIndex {
}
}
/// Potential configurations of ingester connections for the querier to associate with a shard.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngesterMapping {
/// Deliberately not mapping this shard to an ingester. If the querier gets a query for
/// this shard, it should return an error.
NotMapped,
/// Deliberately not contacting ingesters for this shard. If the querier gets a query for
/// this shard, it should only return persisted data.
Ignore,
/// The address of the ingester to contact for this shard.
Addr(Arc<str>),
}
/// Unique ID for a `Partition`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, sqlx::FromRow)]
#[sqlx(transparent)]

View File

@ -11,7 +11,7 @@ use clap_blocks::{
ingester2::Ingester2Config,
ingester_address::IngesterAddress,
object_store::{make_object_store, ObjectStoreConfig},
querier::{IngesterAddresses, QuerierConfig},
querier::QuerierConfig,
router2::Router2Config,
run_config::RunConfig,
socket_addr::SocketAddr,
@ -425,6 +425,9 @@ impl Config {
CatalogDsnConfig::new_sqlite(local_catalog_path)
};
let ingester_addresses =
vec![IngesterAddress::from_str(&ingester_grpc_bind_address.to_string()).unwrap()];
let router_run_config = RunConfig::new(
logging_config,
tracing_config,
@ -458,10 +461,7 @@ impl Config {
let router_config = Router2Config {
query_pool_name: QUERY_POOL_NAME.to_string(),
http_request_limit: 1_000,
ingester_addresses: vec![IngesterAddress::from_str(
&ingester_grpc_bind_address.to_string(),
)
.unwrap()],
ingester_addresses: ingester_addresses.clone(),
new_namespace_retention_hours: None, // infinite retention
namespace_autocreation_enabled: true,
partition_key_pattern: "%Y-%m-%d".to_string(),
@ -498,10 +498,8 @@ impl Config {
};
let querier_config = QuerierConfig {
num_query_threads: None, // will be ignored
shard_to_ingesters_file: None, // will be ignored
shard_to_ingesters: None, // will be ignored
ingester_addresses: vec![ingester_grpc_bind_address.to_string()], // will be ignored
num_query_threads: None, // will be ignored
ingester_addresses,
ram_pool_metadata_bytes: querier_ram_pool_metadata_bytes,
ram_pool_data_bytes: querier_ram_pool_data_bytes,
max_concurrent_queries: querier_max_concurrent_queries,
@ -660,12 +658,7 @@ pub async fn command(config: Config) -> Result<()> {
)
.await;
let ingester_addresses = IngesterAddresses::List(vec![IngesterAddress::from_str(
&ingester_run_config.grpc_bind_address.to_string(),
)
.unwrap()]);
info!(?ingester_addresses, "starting querier");
info!(ingester_addresses = ?querier_config.ingester_addresses, "starting querier");
let querier = create_querier_server_type(QuerierServerTypeArgs {
common_state: &common_state,
metric_registry: Arc::clone(&metrics),
@ -673,9 +666,7 @@ pub async fn command(config: Config) -> Result<()> {
object_store,
exec,
time_provider,
ingester_addresses,
querier_config,
rpc_write: true,
authz: authz.as_ref().map(Arc::clone),
})
.await?;

View File

@ -29,9 +29,6 @@ pub enum Error {
#[error("Invalid config: {0}")]
InvalidConfigCommon(#[from] CommonServerStateError),
#[error("Invalid config: {0}")]
InvalidConfigIngester(#[from] clap_blocks::querier::Error),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
@ -120,7 +117,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
info!("using the write buffer path");
}
let ingester_addresses = config.querier_config.ingester_addresses()?;
let ingester_addresses = &config.querier_config.ingester_addresses;
info!(?ingester_addresses, "using ingester addresses");
let exec = Arc::new(Executor::new(
@ -135,9 +132,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
object_store,
exec,
time_provider,
ingester_addresses,
querier_config: config.querier_config,
rpc_write,
authz: authz.as_ref().map(Arc::clone),
})
.await?;

View File

@ -40,58 +40,6 @@ fn ingester2_errors_without_mode_env_var() {
));
}
#[test]
fn querier_errors_with_mode_env_var_and_shard_to_ingester_mapping() {
let shard_to_ingesters_json = r#"{
"ingesters": {
"i1": {
"addr": "arbitrary"
}
},
"shards": {
"0": {
"ingester": "i1"
}
}
}"#;
Command::cargo_bin("influxdb_iox")
.unwrap()
.env_clear()
.env("INFLUXDB_IOX_RPC_MODE", "2")
.arg("run")
.arg("querier")
.arg("--shard-to-ingesters")
.arg(shard_to_ingesters_json)
.arg("--catalog")
.arg("memory")
.timeout(Duration::from_secs(2))
.assert()
.failure()
.stderr(predicate::str::contains(
"`INFLUXDB_IOX_RPC_MODE` is set but shard to ingester mappings were provided",
));
}
#[test]
fn querier_errors_without_mode_env_var_and_ingester_addresses() {
Command::cargo_bin("influxdb_iox")
.unwrap()
.env_clear()
.arg("run")
.arg("querier")
.arg("--ingester-addresses")
.arg("http://arbitrary:8082")
.arg("--catalog")
.arg("memory")
.timeout(Duration::from_secs(2))
.assert()
.failure()
.stderr(predicate::str::contains(
"`INFLUXDB_IOX_RPC_MODE` is unset but ingester addresses were provided",
));
}
#[test]
fn querier_without_ingesters_without_mode_env_var_uses_write_buffer() {
Command::cargo_bin("influxdb_iox")

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use authz::Authorizer;
use clap_blocks::querier::{IngesterAddresses, QuerierConfig};
use clap_blocks::querier::QuerierConfig;
use datafusion_util::config::register_iox_object_store;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
@ -159,9 +159,7 @@ pub struct QuerierServerTypeArgs<'a> {
pub object_store: Arc<DynObjectStore>,
pub exec: Arc<Executor>,
pub time_provider: Arc<dyn TimeProvider>,
pub ingester_addresses: IngesterAddresses,
pub querier_config: QuerierConfig,
pub rpc_write: bool,
pub authz: Option<Arc<dyn Authorizer>>,
}
@ -199,36 +197,20 @@ pub async fn create_querier_server_type(
);
assert!(existing.is_none());
let ingester_connection = match args.ingester_addresses {
IngesterAddresses::None => None,
IngesterAddresses::ByShardIndex(map) => {
if args.rpc_write {
panic!(
"`INFLUXDB_IOX_RPC_MODE` is set but shard to ingester mappings were provided; \
either unset `INFLUXDB_IOX_RPC_MODE` or specify `--ingester-addresses` instead"
);
}
Some(create_ingester_connections(
Some(map),
None,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
))
}
IngesterAddresses::List(list) => {
if !args.rpc_write {
panic!(
"`INFLUXDB_IOX_RPC_MODE` is unset but ingester addresses were provided; \
either set `INFLUXDB_IOX_RPC_MODE` or specify shard to ingester mappings instead"
);
}
Some(create_ingester_connections(
None,
Some(list.iter().map(|addr| addr.to_string().into()).collect()),
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
))
}
let ingester_connections = if args.querier_config.ingester_addresses.is_empty() {
None
} else {
let ingester_addresses = args
.querier_config
.ingester_addresses
.iter()
.map(|addr| addr.to_string().into())
.collect();
Some(create_ingester_connections(
ingester_addresses,
Arc::clone(&catalog_cache),
args.querier_config.ingester_circuit_breaker_threshold,
))
};
let database = Arc::new(
@ -236,9 +218,8 @@ pub async fn create_querier_server_type(
catalog_cache,
Arc::clone(&args.metric_registry),
args.exec,
ingester_connection,
ingester_connections,
args.querier_config.max_concurrent_queries(),
args.rpc_write,
)
.await?,
);

View File

@ -133,7 +133,6 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
true,
)
.await
.unwrap(),
@ -169,7 +168,6 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
true,
)
.await
.unwrap(),

View File

@ -11,10 +11,9 @@ use cache_system::{
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{ParquetFile, SequenceNumber, TableId};
use data_types::{ParquetFile, TableId};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc};
use trace::span::Span;
@ -33,7 +32,7 @@ pub enum Error {
},
}
type IngesterCounts = Option<Arc<Vec<(Uuid, u64)>>>;
type IngesterCounts = Arc<Vec<(Uuid, u64)>>;
/// Holds catalog information about a parquet file
#[derive(Debug)]
@ -77,19 +76,17 @@ impl CachedParquetFiles {
// Note size_of_val is the size of the Arc
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da
// size of the Arc+(Option+HashMap) itself
// size of the Arc+(HashMap) itself
mem::size_of_val(self) +
// Vec overhead
mem::size_of_val(self.files.as_ref()) +
// size of the underlying parquet files
self.files.iter().map(|f| f.size()).sum::<usize>() +
// hashmap data
self.persisted_file_counts_from_ingesters.as_ref().map(|map| std::mem::size_of_val(map.as_ref()) + map.capacity() * mem::size_of::<(Uuid, u64)>()).unwrap_or_default()
}
/// Returns the greatest parquet sequence number stored in this cache entry
pub(crate) fn max_parquet_sequence_number(&self) -> Option<SequenceNumber> {
self.files.iter().map(|f| f.max_sequence_number).max()
std::mem::size_of_val(self.persisted_file_counts_from_ingesters.as_ref()) +
self.persisted_file_counts_from_ingesters
.as_ref()
.capacity() * mem::size_of::<(Uuid, u64)>()
}
}
@ -130,7 +127,7 @@ impl ParquetFileCache {
async move {
Backoff::new(&backoff_config)
.retry_all_errors("get parquet_files", || {
let extra = extra.clone();
let extra = Arc::clone(&extra);
async {
// TODO refreshing all parquet files for the
// entire table is likely to be quite wasteful
@ -202,24 +199,6 @@ impl ParquetFileCache {
///
/// # Expiration
///
/// If a `max_parquet_sequence_number` is specified, assume we are in the write buffer path.
/// If `max_parquet_sequence_number` is `None`, check to see if
/// `persisted_file_counts_by_ingester_uuid` is specified, which means we are in the RPC write
/// path and need to check ingester2 UUIDs and their associated file counts returned from the
/// ingester requests.
///
/// ## Write Buffer path (based on sequence number)
///
/// Clear the Parquet file cache if the cache does not contain any files that have the
/// specified `max_parquet_sequence_number`.
///
/// Returns true if the cache was cleared (it will be refreshed on the next call to get).
///
/// If a `max_parquet_sequence_number` is supplied that is not in our cache, it means the
/// ingester has written new data to the catalog and the cache is out of date.
///
/// ## RPC write path (based on ingester UUIDs and persisted file counts)
///
/// Clear the Parquet file cache if the information from the ingesters contains an ingester
/// UUID we have never seen before (which indicates an ingester started or restarted) or if an
/// ingester UUID we *have* seen before reports a different number of persisted Parquet files
@ -228,60 +207,30 @@ impl ParquetFileCache {
pub async fn get(
&self,
table_id: TableId,
max_parquet_sequence_number: Option<SequenceNumber>,
persisted_file_counts_by_ingester_uuid: Option<HashMap<Uuid, u64>>,
persisted_file_counts_by_ingester_uuid: HashMap<Uuid, u64>,
span: Option<Span>,
) -> Arc<CachedParquetFiles> {
let persisted_file_counts_by_ingester_uuid =
persisted_file_counts_by_ingester_uuid.map(|map| {
let mut entries = map.into_iter().collect::<Vec<_>>();
entries.sort();
entries.shrink_to_fit();
Arc::new(entries)
});
let mut entries = persisted_file_counts_by_ingester_uuid
.into_iter()
.collect::<Vec<_>>();
entries.sort();
entries.shrink_to_fit();
let persisted_file_counts_by_ingester_uuid = Arc::new(entries);
let persisted_file_counts_by_ingester_uuid_captured =
persisted_file_counts_by_ingester_uuid.clone();
Arc::clone(&persisted_file_counts_by_ingester_uuid);
self.remove_if_handle
.remove_if_and_get(
&self.cache,
table_id,
|cached_file| {
if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
let max_cached = cached_file.max_parquet_sequence_number();
let expire = if let Some(max_cached) = max_cached {
max_cached < max_parquet_sequence_number
} else {
// a max sequence was provided but there were no
// files in the cache. Means we need to refresh
true
};
debug!(
expire,
?max_cached,
max_parquet_sequence_number = max_parquet_sequence_number.get(),
table_id = table_id.get(),
"expire parquet file cache",
);
expire
} else if let Some(ingester_counts) =
&persisted_file_counts_by_ingester_uuid_captured
{
// If there's new or different information about the ingesters or the
// number of files they've persisted, we need to refresh.
different(
cached_file
.persisted_file_counts_from_ingesters
.as_ref()
.map(|x| x.as_ref().as_ref()),
ingester_counts,
)
} else {
false
}
// If there's new or different information about the ingesters or the
// number of files they've persisted, we need to refresh.
different(
cached_file.persisted_file_counts_from_ingesters.as_ref(),
&persisted_file_counts_by_ingester_uuid_captured,
)
},
(persisted_file_counts_by_ingester_uuid, span),
)
@ -295,10 +244,10 @@ impl ParquetFileCache {
}
}
fn different(stored_counts: Option<&[(Uuid, u64)]>, ingester_counts: &[(Uuid, u64)]) -> bool {
fn different(stored_counts: &[(Uuid, u64)], ingester_counts: &[(Uuid, u64)]) -> bool {
// If we have some information stored for this table,
if let Some(stored) = stored_counts {
ingester_counts != stored
if !stored_counts.is_empty() {
ingester_counts != stored_counts
} else {
// Otherwise, we've never seen ingester file counts for this table.
// If the hashmap we got is empty, then we still haven't gotten any information, so we
@ -314,10 +263,7 @@ mod tests {
use super::*;
use data_types::{ColumnType, ParquetFileId};
use iox_tests::{
TestCatalog, TestNamespace, TestParquetFile, TestParquetFileBuilder, TestPartition,
TestTable,
};
use iox_tests::{TestCatalog, TestNamespace, TestParquetFileBuilder, TestPartition, TestTable};
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
@ -332,7 +278,7 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog);
let cached_files = cache.get(table.table.id, None, None, None).await.vec();
let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile.parquet_file;
@ -340,7 +286,7 @@ mod tests {
// validate a second request doesn't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table.table.id, None, None, None).await;
cache.get(table.table.id, HashMap::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
}
@ -359,12 +305,12 @@ mod tests {
let cache = make_cache(&catalog);
let cached_files = cache.get(table1.table.id, None, None, None).await.vec();
let cached_files = cache.get(table1.table.id, HashMap::new(), None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile1.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
let cached_files = cache.get(table2.table.id, None, None, None).await.vec();
let cached_files = cache.get(table2.table.id, HashMap::new(), None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile2.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
@ -380,7 +326,7 @@ mod tests {
let different_catalog = TestCatalog::new();
let cache = make_cache(&different_catalog);
let cached_files = cache.get(table.table.id, None, None, None).await.vec();
let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec();
assert!(cached_files.is_empty());
}
@ -391,96 +337,22 @@ mod tests {
partition.create_parquet_file(builder).await;
let table_id = table.table.id;
let single_file_size = 232;
let two_file_size = 424;
let single_file_size = 256;
let two_file_size = 448;
assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog);
let cached_files = cache.get(table_id, None, None, None).await;
let cached_files = cache.get(table_id, HashMap::new(), None).await;
assert_eq!(cached_files.size(), single_file_size);
// add a second file, and force the cache to find it
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL);
partition.create_parquet_file(builder).await;
cache.expire(table_id);
let cached_files = cache.get(table_id, None, None, None).await;
let cached_files = cache.get(table_id, HashMap::new(), None).await;
assert_eq!(cached_files.size(), two_file_size);
}
#[tokio::test]
async fn test_max_persisted_sequence_number() {
let (catalog, table, partition) = make_catalog().await;
let _sequence_number_1 = SequenceNumber::new(1);
let sequence_number_2 = SequenceNumber::new(2);
let sequence_number_3 = SequenceNumber::new(3);
let sequence_number_10 = SequenceNumber::new(10);
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_max_seq(sequence_number_2.get())
.with_min_time(0)
.with_max_time(100);
let tfile1_2 = partition.create_parquet_file(builder).await;
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_max_seq(sequence_number_3.get())
.with_min_time(0)
.with_max_time(100);
let tfile1_3 = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog);
let table_id = table.table.id;
assert_eq!(
cache.get(table_id, None, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// simulate request with sequence number 2
// should not expire anything
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
assert_eq!(
cache
.get(table_id, Some(sequence_number_2), None, None)
.await
.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// simulate request with no sequence number
// should not expire anything
assert_eq!(
cache.get(table_id, None, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// new file is created, but cache is stale
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_max_seq(sequence_number_10.get())
.with_min_time(0)
.with_max_time(100);
let tfile1_10 = partition.create_parquet_file(builder).await;
// cache doesn't have tfile1_10
assert_eq!(
cache.get(table_id, None, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// new request includes sequence 10 and causes a cache refresh
// now cache has tfile!_10 (yay!)
assert_eq!(
cache
.get(table_id, Some(sequence_number_10), None, None)
.await
.ids(),
ids(&[&tfile1_2, &tfile1_3, &tfile1_10])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
#[tokio::test]
async fn ingester2_uuid_file_counts() {
let (catalog, table, _partition) = make_catalog().await;
@ -489,91 +361,39 @@ mod tests {
let cache = make_cache(&catalog);
// No metadata: make one request that should be cached
cache.get(table_id, None, None, None).await;
cache.get(table_id, HashMap::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
cache.get(table_id, None, None, None).await;
cache.get(table_id, HashMap::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Empty metadata: make one request, should still be cached
cache.get(table_id, None, Some(HashMap::new()), None).await;
cache.get(table_id, HashMap::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// See a new UUID: refresh the cache
cache
.get(table_id, None, Some(HashMap::from([(uuid, 3)])), None)
.await;
cache.get(table_id, HashMap::from([(uuid, 3)]), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
// See the same UUID with the same count: should still be cached
cache
.get(table_id, None, Some(HashMap::from([(uuid, 3)])), None)
.await;
cache.get(table_id, HashMap::from([(uuid, 3)]), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
// See the same UUID with a different count: refresh the cache
cache
.get(table_id, None, Some(HashMap::from([(uuid, 4)])), None)
.await;
cache.get(table_id, HashMap::from([(uuid, 4)]), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 3);
// Empty metadata again: still use the cache
cache.get(table_id, None, Some(HashMap::new()), None).await;
cache.get(table_id, HashMap::new(), None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4);
// See a new UUID and not the old one: refresh the cache
let new_uuid = Uuid::new_v4();
cache
.get(table_id, None, Some(HashMap::from([(new_uuid, 1)])), None)
.get(table_id, HashMap::from([(new_uuid, 1)]), None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5);
}
#[tokio::test]
async fn test_expire_empty() {
let (catalog, table, partition) = make_catalog().await;
let cache = make_cache(&catalog);
let table_id = table.table.id;
// no parquet files, should be none
assert!(cache.get(table_id, None, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request should be cached
assert!(cache.get(table_id, None, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Calls to expire if there is no known persisted file, should still be cached
assert!(cache.get(table_id, None, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// make a new parquet file
let sequence_number_1 = SequenceNumber::new(1);
let builder = TestParquetFileBuilder::default()
.with_line_protocol(TABLE1_LINE_PROTOCOL)
.with_max_seq(sequence_number_1.get())
.with_min_time(0)
.with_max_time(100);
let tfile = partition.create_parquet_file(builder).await;
// cache is stale
assert!(cache.get(table_id, None, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new file, will cause a cache refresh
assert_eq!(
cache
.get(table_id, Some(sequence_number_1), None, None)
.await
.ids(),
ids(&[&tfile])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
fn ids(files: &[&TestParquetFile]) -> HashSet<ParquetFileId> {
files.iter().map(|f| f.parquet_file.id).collect()
}
/// Extracts parquet ids from various objects
trait ParquetIds {
fn ids(&self) -> HashSet<ParquetFileId>;

View File

@ -6,13 +6,12 @@ use crate::{
};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Namespace, ShardIndex};
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use data_types::Namespace;
use iox_catalog::interface::SoftDeletedRows;
use iox_query::exec::Executor;
use service_common::QueryNamespaceProvider;
use sharder::JumpHash;
use snafu::Snafu;
use std::{collections::BTreeSet, sync::Arc};
use std::sync::Arc;
use trace::span::{Span, SpanRecorder};
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
@ -30,8 +29,6 @@ pub enum Error {
Catalog {
source: iox_catalog::interface::Error,
},
#[snafu(display("No shards loaded"))]
NoShards,
}
/// Database for the querier.
@ -68,10 +65,6 @@ pub struct QuerierDatabase {
/// If the same namespace is requested twice for different queries, it is counted twice.
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
/// Sharder to determine which ingesters to query for a particular table and namespace.
/// Only relevant when using the write buffer; will be None if using RPC write ingesters.
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
/// Chunk prune metrics.
prune_metrics: Arc<PruneMetrics>,
}
@ -107,7 +100,6 @@ impl QuerierDatabase {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
max_concurrent_queries: usize,
rpc_write: bool,
) -> Result<Self, Error> {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
@ -121,7 +113,6 @@ impl QuerierDatabase {
let chunk_adapter = Arc::new(ChunkAdapter::new(
Arc::clone(&catalog_cache),
Arc::clone(&metric_registry),
rpc_write,
));
let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider()));
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
@ -131,14 +122,6 @@ impl QuerierDatabase {
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries));
let sharder = if rpc_write {
None
} else {
Some(Arc::new(
create_sharder(catalog_cache.catalog().as_ref(), backoff_config.clone()).await?,
))
};
let prune_metrics = Arc::new(PruneMetrics::new(&metric_registry));
Ok(Self {
@ -150,7 +133,6 @@ impl QuerierDatabase {
ingester_connection,
query_log,
query_execution_semaphore,
sharder,
prune_metrics,
})
}
@ -179,7 +161,6 @@ impl QuerierDatabase {
Arc::clone(&self.exec),
self.ingester_connection.clone(),
Arc::clone(&self.query_log),
self.sharder.clone(),
Arc::clone(&self.prune_metrics),
)))
}
@ -211,41 +192,11 @@ impl QuerierDatabase {
}
}
pub async fn create_sharder(
catalog: &dyn Catalog,
backoff_config: BackoffConfig,
) -> Result<JumpHash<Arc<ShardIndex>>, Error> {
let shards = Backoff::new(&backoff_config)
.retry_all_errors("get shards", || async {
catalog.repositories().await.shards().list().await
})
.await
.expect("retry forever");
// Construct the (ordered) set of shard indexes.
//
// The sort order must be deterministic in order for all nodes to shard to
// the same indexes, therefore we type assert the returned set is of the
// ordered variety.
let shard_indexes: BTreeSet<_> = shards
// ^ don't change this to an unordered set
.into_iter()
.map(|shard| shard.shard_index)
.collect();
if shard_indexes.is_empty() {
return Err(Error::NoShards);
}
Ok(JumpHash::new(shard_indexes.into_iter().map(Arc::new)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::create_ingester_connection_for_testing;
use iox_tests::TestCatalog;
use test_helpers::assert_error;
use tokio::runtime::Handle;
#[tokio::test]
@ -268,38 +219,11 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
true,
)
.await
.unwrap();
}
#[tokio::test]
async fn shards_in_catalog_are_required_for_startup() {
let catalog = TestCatalog::new();
let catalog_cache = Arc::new(CatalogCache::new_testing(
catalog.catalog(),
catalog.time_provider(),
catalog.metric_registry(),
catalog.object_store(),
&Handle::current(),
));
assert_error!(
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
false, // this only applies to the kafka write path
)
.await,
Error::NoShards
);
}
#[tokio::test]
async fn test_namespace() {
let catalog = TestCatalog::new();
@ -319,7 +243,6 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
true,
)
.await
.unwrap();
@ -349,7 +272,6 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
true,
)
.await
.unwrap();

View File

@ -224,7 +224,6 @@ mod tests {
exec,
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
true,
)
.await
.unwrap(),

View File

@ -13,8 +13,8 @@ use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig, BackoffError};
use client_util::connection;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, IngesterMapping, NamespaceId, PartitionId,
SequenceNumber, ShardId, ShardIndex, TableSummary, TimestampMinMax,
ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionId, SequenceNumber, ShardId,
ShardIndex, TableSummary, TimestampMinMax,
};
use datafusion::error::DataFusionError;
use futures::{stream::FuturesUnordered, TryStreamExt};
@ -148,8 +148,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Create a new set of connections given ingester configurations
pub fn create_ingester_connections(
shard_to_ingesters: Option<HashMap<ShardIndex, IngesterMapping>>,
ingester_addresses: Option<Vec<Arc<str>>>,
ingester_addresses: Vec<Arc<str>>,
catalog_cache: Arc<CatalogCache>,
open_circuit_after_n_errors: u64,
) -> Arc<dyn IngesterConnection> {
@ -170,29 +169,13 @@ pub fn create_ingester_connections(
deadline: None,
};
// Exactly one of `shard_to_ingesters` or `ingester_addreses` must be specified.
// `shard_to_ingesters` uses the Kafka write buffer path.
// `ingester_addresses` uses the RPC write path.
match (shard_to_ingesters, ingester_addresses) {
(None, None) => panic!("Neither shard_to_ingesters nor ingester_addresses was specified!"),
(Some(_), Some(_)) => {
panic!("Both shard_to_ingesters and ingester_addresses were specified!")
}
(Some(shard_to_ingesters), None) => Arc::new(IngesterConnectionImpl::by_shard(
shard_to_ingesters,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
)),
(None, Some(ingester_addresses)) => Arc::new(IngesterConnectionImpl::by_addrs(
ingester_addresses,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
)),
}
Arc::new(IngesterConnectionImpl::by_addrs(
ingester_addresses,
catalog_cache,
retry_backoff_config,
circuit_breaker_backoff_config,
open_circuit_after_n_errors,
))
}
/// Create a new ingester suitable for testing
@ -204,14 +187,8 @@ pub fn create_ingester_connection_for_testing() -> Arc<dyn IngesterConnection> {
#[async_trait]
pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
/// Returns all partitions ingester(s) know about for the specified table.
///
/// # Panics
///
/// Panics if the list of shard_indexes is empty.
#[allow(clippy::too_many_arguments)]
async fn partitions(
&self,
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
cached_table: Arc<CachedTable>,
columns: Vec<String>,
@ -345,7 +322,6 @@ impl<'a> Drop for ObserveIngesterRequest<'a> {
/// IngesterConnection that communicates with an ingester.
#[derive(Debug)]
pub struct IngesterConnectionImpl {
shard_to_ingesters: HashMap<ShardIndex, IngesterMapping>,
unique_ingester_addresses: HashSet<Arc<str>>,
flight_client: Arc<dyn IngesterFlightClient>,
catalog_cache: Arc<CatalogCache>,
@ -354,78 +330,7 @@ pub struct IngesterConnectionImpl {
}
impl IngesterConnectionImpl {
/// Create a new set of connections given a map of shard indexes to Ingester addresses, such as:
///
/// ```json
/// {
/// "shards": {
/// "0": {
/// "ingesters": [
/// {"addr": "http://ingester-0:8082"},
/// {"addr": "http://ingester-3:8082"}
/// ]
/// },
/// "1": { "ingesters": [{"addr": "http://ingester-1:8082"}]},
/// }
/// }
/// ```
pub fn by_shard(
shard_to_ingesters: HashMap<ShardIndex, IngesterMapping>,
catalog_cache: Arc<CatalogCache>,
retry_backoff_config: BackoffConfig,
circuit_breaker_backoff_config: BackoffConfig,
open_circuit_after_n_errors: u64,
) -> Self {
let flight_client = Arc::new(FlightClientImpl::new());
let flight_client = Arc::new(InvalidateOnErrorFlightClient::new(flight_client));
let flight_client = Arc::new(CircuitBreakerFlightClient::new(
flight_client,
catalog_cache.time_provider(),
catalog_cache.metric_registry(),
open_circuit_after_n_errors,
circuit_breaker_backoff_config,
));
Self::by_shard_with_flight_client(
shard_to_ingesters,
flight_client,
catalog_cache,
retry_backoff_config,
)
}
/// Create new set of connections with specific flight client implementation.
///
/// This is helpful for testing, i.e. when the flight client should not be backed by normal
/// network communication.
pub fn by_shard_with_flight_client(
shard_to_ingesters: HashMap<ShardIndex, IngesterMapping>,
flight_client: Arc<dyn IngesterFlightClient>,
catalog_cache: Arc<CatalogCache>,
backoff_config: BackoffConfig,
) -> Self {
let unique_ingester_addresses: HashSet<_> = shard_to_ingesters
.values()
.flat_map(|v| match v {
IngesterMapping::Addr(addr) => Some(addr),
_ => None,
})
.cloned()
.collect();
let metric_registry = catalog_cache.metric_registry();
let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry));
Self {
shard_to_ingesters,
unique_ingester_addresses,
flight_client,
catalog_cache,
metrics,
backoff_config,
}
}
/// Create a new set of connections given a list of ingester2 addresses.
/// Create a new set of connections given a list of ingester addresses.
pub fn by_addrs(
ingester_addresses: Vec<Arc<str>>,
catalog_cache: Arc<CatalogCache>,
@ -443,11 +348,28 @@ impl IngesterConnectionImpl {
circuit_breaker_backoff_config,
));
Self::by_addrs_with_flight_client(
ingester_addresses,
flight_client,
catalog_cache,
backoff_config,
)
}
/// Create new set of connections with specific flight client implementation.
///
/// This is helpful for testing, i.e. when the flight client should not be backed by normal
/// network communication.
pub fn by_addrs_with_flight_client(
ingester_addresses: Vec<Arc<str>>,
flight_client: Arc<dyn IngesterFlightClient>,
catalog_cache: Arc<CatalogCache>,
backoff_config: BackoffConfig,
) -> Self {
let metric_registry = catalog_cache.metric_registry();
let metrics = Arc::new(IngesterConnectionMetrics::new(&metric_registry));
Self {
shard_to_ingesters: HashMap::new(),
unique_ingester_addresses: ingester_addresses.into_iter().collect(),
flight_client,
catalog_cache,
@ -759,7 +681,6 @@ impl IngesterStreamDecoder {
};
let partition = IngesterPartition::new(
Arc::clone(&self.ingester_address),
ingester_uuid,
partition_id,
shard_id,
@ -854,56 +775,12 @@ impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table, shard, and predicate
async fn partitions(
&self,
shard_indexes: Option<Vec<ShardIndex>>,
namespace_id: NamespaceId,
cached_table: Arc<CachedTable>,
columns: Vec<String>,
predicate: &Predicate,
span: Option<Span>,
) -> Result<Vec<IngesterPartition>> {
let relevant_ingester_addresses = match shard_indexes {
// If shard indexes is None, we're using the RPC write path, and all ingesters should
// be queried.
None => self.unique_ingester_addresses.clone(),
// If shard indexes is Some([]), no ingester addresses can be found. This is a
// configuration problem somewhwere.
Some(shard_indexes) if shard_indexes.is_empty() => {
panic!("Called `IngesterConnection.partitions` with an empty `shard_indexes` list");
}
// Otherwise, we're using the write buffer and need to look up the ingesters to contact
// by their shard index.
Some(shard_indexes) => {
// Look up the ingesters needed for the shard. Collect into a HashSet to avoid
// making multiple requests to the same ingester if that ingester is responsible
// for multiple shard_indexes relevant to this query.
let mut relevant_ingester_addresses = HashSet::new();
for shard_index in &shard_indexes {
match self.shard_to_ingesters.get(shard_index) {
None => {
return NoIngesterFoundForShardSnafu {
shard_index: *shard_index,
}
.fail()
}
Some(mapping) => match mapping {
IngesterMapping::Addr(addr) => {
relevant_ingester_addresses.insert(Arc::clone(addr));
}
IngesterMapping::Ignore => (),
IngesterMapping::NotMapped => {
return ShardNotMappedSnafu {
shard_index: *shard_index,
}
.fail()
}
},
}
}
relevant_ingester_addresses
}
};
let mut span_recorder = SpanRecorder::new(span);
let metrics = Arc::clone(&self.metrics);
@ -960,8 +837,10 @@ impl IngesterConnection for IngesterConnectionImpl {
}
};
let mut ingester_partitions: Vec<IngesterPartition> = relevant_ingester_addresses
.into_iter()
let mut ingester_partitions: Vec<IngesterPartition> = self
.unique_ingester_addresses
.iter()
.cloned()
.map(move |ingester_address| measured_ingester_request(ingester_address))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
@ -1001,8 +880,6 @@ impl IngesterConnection for IngesterConnectionImpl {
/// IngesterPartition for each table the ingester knows about.
#[derive(Debug, Clone)]
pub struct IngesterPartition {
ingester: Arc<str>,
/// If using ingester2/rpc write path, the ingester UUID will be present and will identify
/// whether this ingester has restarted since the last time it was queried or not.
///
@ -1036,7 +913,6 @@ impl IngesterPartition {
/// `RecordBatches` into the correct types
#[allow(clippy::too_many_arguments)]
pub fn new(
ingester: Arc<str>,
ingester_uuid: Option<Uuid>,
partition_id: PartitionId,
shard_id: ShardId,
@ -1046,7 +922,6 @@ impl IngesterPartition {
partition_sort_key: Option<Arc<SortKey>>,
) -> Self {
Self {
ingester,
ingester_uuid,
partition_id,
shard_id,
@ -1120,10 +995,6 @@ impl IngesterPartition {
}
}
pub(crate) fn ingester(&self) -> &Arc<str> {
&self.ingester
}
pub(crate) fn ingester_uuid(&self) -> Option<Uuid> {
self.ingester_uuid
}
@ -1382,11 +1253,7 @@ mod tests {
use metric::Attributes;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{builder::SchemaBuilder, InfluxFieldType};
use std::{
collections::{BTreeSet, HashMap},
time::Duration,
};
use test_helpers::assert_error;
use std::collections::{BTreeSet, HashMap};
use tokio::{runtime::Handle, sync::Mutex};
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector};
@ -1403,7 +1270,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::RemoteQuery { .. });
}
@ -1419,7 +1286,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::RemoteQuery { .. });
}
@ -1436,7 +1303,7 @@ mod tests {
);
let mut ingester_conn = mock_flight_client.ingester_conn().await;
ingester_conn.backoff_config = BackoffConfig::default();
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert!(partitions.is_empty());
}
@ -1452,7 +1319,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::RemoteQuery { .. });
}
@ -1462,7 +1329,7 @@ mod tests {
MockFlightClient::new([("addr1", Ok(MockQueryData { results: vec![] }))]).await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert!(partitions.is_empty());
}
@ -1520,24 +1387,10 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert!(partitions.is_empty());
}
#[tokio::test]
async fn no_ingester_addresses_found_is_a_configuration_error() {
let mock_flight_client = Arc::new(
MockFlightClient::new([("addr1", Ok(MockQueryData { results: vec![] }))]).await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
// Shard index 0 doesn't have an associated ingester address in the test setup
assert_error!(
get_partitions(&ingester_conn, &[0]).await,
Error::NoIngesterFoundForShard { .. },
);
}
#[tokio::test]
async fn test_flight_no_batches() {
let ingester_uuid = Uuid::new_v4();
@ -1560,7 +1413,7 @@ mod tests {
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert_eq!(partitions.len(), 1);
let p = &partitions[0];
@ -1587,7 +1440,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::PartitionStatusMissing { .. });
}
@ -1630,7 +1483,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::DuplicatePartitionInfo { .. });
}
@ -1650,7 +1503,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::ChunkWithoutPartition { .. });
}
@ -1670,7 +1523,7 @@ mod tests {
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn, &[1]).await.unwrap_err();
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::BatchWithoutChunk { .. });
}
@ -1772,7 +1625,7 @@ mod tests {
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn, &[1, 2]).await.unwrap();
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert_eq!(partitions.len(), 3);
let p1 = &partitions[0];
@ -1841,7 +1694,6 @@ mod tests {
let columns = vec![String::from("col")];
let err = ingester_conn
.partitions(
None,
NamespaceId::new(1),
cached_table(),
columns,
@ -1904,7 +1756,6 @@ mod tests {
let columns = vec![String::from("col")];
let partitions = ingester_conn
.partitions(
None,
NamespaceId::new(1),
cached_table(),
columns,
@ -1978,7 +1829,6 @@ mod tests {
let traces = Arc::new(RingBufferTraceCollector::new(100));
get_partitions_with_span(
&ingester_conn,
&[1, 2, 3, 4, 5],
Some(Span::root("root", Arc::clone(&traces) as _)),
)
.await
@ -2046,83 +1896,19 @@ mod tests {
assert_eq!(n_spans_ok_or_cancelled, 4);
}
#[tokio::test]
async fn test_flight_per_shard_querying() {
let ingester_uuid = Uuid::new_v4();
let record_batch_1_1 = lp_to_record_batch("table foo=1 1");
let schema_1_1 = record_batch_1_1.schema();
let mock_flight_client = Arc::new(
MockFlightClient::new([
(
"addr1",
Ok(MockQueryData {
results: vec![
metadata(
1,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
ingester_uuid.to_string(),
5,
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema_1_1)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
DecodedPayload::RecordBatch(record_batch_1_1),
IngesterQueryResponseMetadata::default(),
)),
],
}),
),
(
"addr2",
Err(FlightClientError::Flight {
source: tonic::Status::internal("if this is queried, the test should fail")
.into(),
}),
),
])
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
// Only use shard index 1, which will correspond to only querying the ingester at
// "addr1"
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
assert_eq!(partitions.len(), 1);
let p1 = &partitions[0];
assert_eq!(p1.partition_id.get(), 1);
assert_eq!(p1.shard_id.get(), 1);
assert_eq!(
p1.parquet_max_sequence_number,
Some(SequenceNumber::new(11))
);
assert_eq!(p1.tombstone_max_sequence_number, None);
assert_eq!(p1.chunks.len(), 1);
}
async fn get_partitions(
ingester_conn: &IngesterConnectionImpl,
shard_indexes: &[i32],
) -> Result<Vec<IngesterPartition>, Error> {
get_partitions_with_span(ingester_conn, shard_indexes, None).await
get_partitions_with_span(ingester_conn, None).await
}
async fn get_partitions_with_span(
ingester_conn: &IngesterConnectionImpl,
shard_indexes: &[i32],
span: Option<Span>,
) -> Result<Vec<IngesterPartition>, Error> {
let columns = vec![String::from("col")];
let shard_indexes: Vec<_> = shard_indexes.iter().copied().map(ShardIndex::new).collect();
ingester_conn
.partitions(
Some(shard_indexes),
NamespaceId::new(1),
cached_table(),
columns,
@ -2215,25 +2001,11 @@ mod tests {
}
}
// Assign one shard per address, sorted consistently.
// Don't assign any addresses to shard index 0 to test error case
async fn ingester_conn(self: &Arc<Self>) -> IngesterConnectionImpl {
let ingester_addresses: BTreeSet<_> =
self.responses.lock().await.keys().cloned().collect();
let shard_to_ingesters = ingester_addresses
.into_iter()
.enumerate()
.map(|(shard_index, ingester_address)| {
(
ShardIndex::new(shard_index as i32 + 1),
IngesterMapping::Addr(Arc::from(ingester_address.as_str())),
)
})
.collect();
IngesterConnectionImpl::by_shard_with_flight_client(
shard_to_ingesters,
IngesterConnectionImpl::by_addrs_with_flight_client(
ingester_addresses.into_iter().map(Into::into).collect(),
Arc::clone(self) as _,
Arc::new(CatalogCache::new_testing(
self.catalog.catalog(),
@ -2290,7 +2062,6 @@ mod tests {
let tombstone_max_sequence_number = None;
// Construct a partition and ensure it doesn't error
let ingester_partition = IngesterPartition::new(
"ingester".into(),
Some(ingester_uuid),
PartitionId::new(1),
ShardId::new(1),
@ -2323,7 +2094,6 @@ mod tests {
let parquet_max_sequence_number = None;
let tombstone_max_sequence_number = None;
let err = IngesterPartition::new(
"ingester".into(),
Some(ingester_uuid),
PartitionId::new(1),
ShardId::new(1),

View File

@ -1,13 +1,10 @@
use crate::cache::namespace::CachedTable;
use super::IngesterConnection;
use crate::cache::namespace::CachedTable;
use async_trait::async_trait;
use data_types::NamespaceId;
use data_types::ShardIndex;
use iox_query::util::create_basic_summary;
use parking_lot::Mutex;
use schema::Projection;
use schema::Schema as IOxSchema;
use schema::{Projection, Schema as IOxSchema};
use std::{any::Any, sync::Arc};
use trace::span::Span;
@ -34,7 +31,6 @@ impl MockIngesterConnection {
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_shard_indexes: Option<Vec<ShardIndex>>,
_namespace_id: NamespaceId,
_cached_table: Arc<CachedTable>,
columns: Vec<String>,

View File

@ -7,9 +7,8 @@ use crate::{
query_log::QueryLog,
table::{PruneMetrics, QuerierTable, QuerierTableArgs},
};
use data_types::{NamespaceId, ShardIndex};
use data_types::NamespaceId;
use iox_query::exec::Executor;
use sharder::JumpHash;
use std::{collections::HashMap, sync::Arc};
mod query_access;
@ -58,7 +57,6 @@ impl QuerierNamespace {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
query_log: Arc<QueryLog>,
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
prune_metrics: Arc<PruneMetrics>,
) -> Self {
let tables: HashMap<_, _> = ns
@ -66,7 +64,6 @@ impl QuerierNamespace {
.iter()
.map(|(table_name, cached_table)| {
let table = Arc::new(QuerierTable::new(QuerierTableArgs {
sharder: sharder.clone(),
namespace_id: ns.id,
namespace_name: Arc::clone(&name),
namespace_retention_period: ns.retention_period,
@ -103,11 +100,9 @@ impl QuerierNamespace {
ns: Arc<CachedNamespace>,
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
write_rpc: bool,
) -> Self {
let time_provider = catalog_cache.time_provider();
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry, write_rpc));
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry));
let query_log = Arc::new(QueryLog::new(10, time_provider));
let prune_metrics = Arc::new(PruneMetrics::new(&chunk_adapter.metric_registry()));
@ -118,7 +113,6 @@ impl QuerierNamespace {
exec,
ingester_connection,
query_log,
Some(sharder),
prune_metrics,
)
}

View File

@ -2,12 +2,11 @@ use super::QuerierNamespace;
use crate::{
cache::namespace::CachedNamespace, create_ingester_connection_for_testing, QuerierCatalogCache,
};
use data_types::{ShardIndex, TableId};
use data_types::TableId;
use datafusion_util::config::register_iox_object_store;
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_query::exec::ExecutorType;
use iox_tests::TestNamespace;
use sharder::JumpHash;
use std::sync::Arc;
use tokio::runtime::Handle;
@ -45,8 +44,6 @@ pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
Arc::clone(parquet_store.object_store()),
);
let sharder = Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new)));
QuerierNamespace::new_testing(
catalog_cache,
ns.catalog.metric_registry(),
@ -54,8 +51,6 @@ pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
cached_ns,
ns.catalog.exec(),
Some(create_ingester_connection_for_testing()),
sharder,
true,
)
}

View File

@ -33,22 +33,14 @@ pub struct ChunkAdapter {
/// Metric registry.
metric_registry: Arc<metric::Registry>,
/// Use RPC-write path (aka router2/ingester2).
rpc_write: bool,
}
impl ChunkAdapter {
/// Create new adapter with empty cache.
pub fn new(
catalog_cache: Arc<CatalogCache>,
metric_registry: Arc<metric::Registry>,
rpc_write: bool,
) -> Self {
pub fn new(catalog_cache: Arc<CatalogCache>, metric_registry: Arc<metric::Registry>) -> Self {
Self {
catalog_cache,
metric_registry,
rpc_write,
}
}
@ -227,11 +219,7 @@ impl ChunkAdapter {
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let order = if self.rpc_write {
ChunkOrder::new(parquet_file.max_l0_created_at.get())
} else {
ChunkOrder::new(parquet_file.max_sequence_number.get())
};
let order = ChunkOrder::new(parquet_file.max_l0_created_at.get());
let meta = Arc::new(QuerierParquetChunkMeta {
parquet_file_id: parquet_file.id,

View File

@ -276,7 +276,6 @@ pub mod tests {
&Handle::current(),
)),
catalog.metric_registry(),
true,
);
Self {

View File

@ -1,25 +1,22 @@
use self::query_access::QuerierTableChunkPruner;
use self::state_reconciler::Reconciler;
use self::{query_access::QuerierTableChunkPruner, state_reconciler::Reconciler};
use crate::{
ingester::{self, IngesterPartition},
parquet::ChunkAdapter,
IngesterConnection,
};
use data_types::{ColumnId, DeletePredicate, NamespaceId, PartitionId, ShardIndex, TableId};
use data_types::{ColumnId, DeletePredicate, NamespaceId, TableId};
use datafusion::error::DataFusionError;
use futures::join;
use iox_query::{provider, provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::{debug, trace};
use predicate::Predicate;
use schema::Schema;
use sharder::JumpHash;
use snafu::{ResultExt, Snafu};
use std::borrow::Cow;
use std::collections::HashSet;
use std::time::Duration;
use std::{
collections::{hash_map::Entry, HashMap},
borrow::Cow,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use trace::span::{Span, SpanRecorder};
use uuid::Uuid;
@ -39,18 +36,6 @@ pub enum Error {
#[snafu(display("Error getting partitions from ingester: {}", source))]
GettingIngesterPartitions { source: ingester::Error },
#[snafu(display(
"Ingester '{}' and '{}' both provide data for partition {}",
ingester1,
ingester2,
partition
))]
IngestersOverlap {
ingester1: Arc<str>,
ingester2: Arc<str>,
partition: PartitionId,
},
#[snafu(display("Cannot combine ingester data with catalog/cache: {}", source))]
StateFusion {
source: state_reconciler::ReconcileError,
@ -70,7 +55,6 @@ impl From<Error> for DataFusionError {
/// Args to create a [`QuerierTable`].
pub struct QuerierTableArgs {
pub sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
pub namespace_id: NamespaceId,
pub namespace_name: Arc<str>,
pub namespace_retention_period: Option<Duration>,
@ -85,10 +69,6 @@ pub struct QuerierTableArgs {
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Sharder to query for which shards are responsible for the table's data. If not specified,
/// query all ingesters because we're using the RPC write path.
sharder: Option<Arc<JumpHash<Arc<ShardIndex>>>>,
/// Namespace the table is in
namespace_name: Arc<str>,
@ -121,7 +101,6 @@ impl QuerierTable {
/// Create new table.
pub fn new(args: QuerierTableArgs) -> Self {
let QuerierTableArgs {
sharder,
namespace_id,
namespace_name,
namespace_retention_period,
@ -134,7 +113,6 @@ impl QuerierTable {
} = args;
Self {
sharder,
namespace_name,
namespace_id,
namespace_retention_period,
@ -241,8 +219,7 @@ impl QuerierTable {
),
catalog_cache.parquet_file().get(
self.id(),
None,
None,
HashMap::new(),
span_recorder.child_span("cache GET parquet_file (pre-warm")
),
catalog_cache.tombstone().get(
@ -255,34 +232,20 @@ impl QuerierTable {
// handle errors / cache refresh
let partitions = partitions?;
// determine max parquet sequence number for cache invalidation, if using the write buffer
// path.
let max_parquet_sequence_number = if self.rpc_write() {
None
} else {
partitions
.iter()
.flat_map(|p| p.parquet_max_sequence_number())
.max()
};
let max_tombstone_sequence_number = partitions
.iter()
.flat_map(|p| p.tombstone_max_sequence_number())
.max();
// If using the RPC write path, determine number of persisted parquet files per ingester
// UUID seen in the ingester query responses for cache invalidation. If this is an empty
// HashMap, then there are no results from the ingesters.
let persisted_file_counts_by_ingester_uuid = if self.rpc_write() {
Some(collect_persisted_file_counts(
partitions.len(),
partitions
.iter()
.map(|p| (p.ingester_uuid(), p.completed_persistence_count())),
))
} else {
None
};
// Determine number of persisted parquet files per ingester UUID seen in the ingester query
// responses for cache invalidation. If `persisted_file_counts_by_ingester_uuid` is empty,
// then there are no results from the ingesters.
let persisted_file_counts_by_ingester_uuid = collect_persisted_file_counts(
partitions.len(),
partitions
.iter()
.map(|p| (p.ingester_uuid(), p.completed_persistence_count())),
);
debug!(
namespace=%self.namespace_name,
@ -297,7 +260,6 @@ impl QuerierTable {
let (parquet_files, tombstones) = join!(
catalog_cache.parquet_file().get(
self.id(),
max_parquet_sequence_number,
persisted_file_counts_by_ingester_uuid,
span_recorder.child_span("cache GET parquet_file"),
),
@ -333,7 +295,6 @@ impl QuerierTable {
Arc::clone(&self.table_name),
Arc::clone(&self.namespace_name),
Arc::clone(self.chunk_adapter.catalog_cache()),
self.rpc_write(),
);
// create parquet files
@ -433,15 +394,6 @@ impl QuerierTable {
// The provided projection should include all columns needed by the query
let columns = self.schema.select_given_and_pk_columns(projection);
// Get the shard indexes responsible for this table's data from the sharder to
// determine which ingester(s) to query.
// Currently, the sharder will only return one shard index per table, but in the
// near future, the sharder might return more than one shard index for one table.
let shard_indexes = self
.sharder
.as_ref()
.map(|sharder| vec![**sharder.shard_for_query(&self.table_name, &self.namespace_name)]);
// get cached table w/o any must-coverage information
let Some(cached_table) = self.chunk_adapter
.catalog_cache()
@ -460,7 +412,6 @@ impl QuerierTable {
// get any chunks from the ingester(s)
let partitions_result = ingester_connection
.partitions(
shard_indexes,
self.namespace_id,
cached_table,
columns,
@ -472,34 +423,9 @@ impl QuerierTable {
let partitions = partitions_result?;
if !self.rpc_write() {
// check that partitions from ingesters don't overlap
let mut seen = HashMap::with_capacity(partitions.len());
for partition in &partitions {
match seen.entry(partition.partition_id()) {
Entry::Occupied(o) => {
return Err(Error::IngestersOverlap {
ingester1: Arc::clone(o.get()),
ingester2: Arc::clone(partition.ingester()),
partition: partition.partition_id(),
})
}
Entry::Vacant(v) => {
v.insert(Arc::clone(partition.ingester()));
}
}
}
}
Ok(partitions)
}
/// Whether we're using the RPC write path or not. Write buffer mode will always specify a
/// sharder; the RPC write path never will.
fn rpc_write(&self) -> bool {
self.sharder.is_none()
}
/// clear the parquet file cache
#[cfg(test)]
fn clear_parquet_cache(&self) {
@ -547,8 +473,7 @@ mod tests {
table::test_util::{querier_table, IngesterPartitionBuilder},
};
use arrow_util::assert_batches_eq;
use assert_matches::assert_matches;
use data_types::{ChunkId, ColumnType, CompactionLevel, SequenceNumber};
use data_types::{ChunkId, ColumnType, SequenceNumber};
use iox_query::exec::IOxSessionContext;
use iox_tests::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::TimeProvider;
@ -556,7 +481,7 @@ mod tests {
use schema::{builder::SchemaBuilder, InfluxFieldType};
use std::sync::Arc;
use test_helpers::maybe_start_logging;
use trace::{span::SpanStatus, RingBufferTraceCollector};
use trace::RingBufferTraceCollector;
#[test]
fn sum_up_persisted_file_counts() {
@ -897,312 +822,6 @@ mod tests {
assert_batches_eq!(&expected, &batches);
}
#[tokio::test]
async fn test_compactor_collision() {
maybe_start_logging();
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let schema = make_schema(&table).await;
// create a parquet file that cannot be processed by the querier:
//
//
// --------------------------- sequence number ----------------------------->
// | 0 | 1 | 2 |
//
//
// Available Information:
// ( ingester reports as "persited" )
// ( ingester in-mem data )
// ( parquet file )
//
//
// Desired Information:
// ( wanted parquet data )
// ( ignored parquet data )
// ( ingester in-mem data )
//
//
// However there is no way to split the parquet data into the "wanted" and "ignored" part
// because we don't have row-level sequence numbers.
let builder = TestParquetFileBuilder::default()
.with_line_protocol("table foo=1 11")
.with_max_seq(2)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
partition.create_parquet_file(builder).await;
let builder = IngesterPartitionBuilder::new(schema, &shard, &partition);
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(1)));
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(ingester_partition);
let err = querier_table.chunks().await.unwrap_err();
assert_matches!(err, Error::StateFusion { .. });
}
#[tokio::test]
async fn test_state_reconcile() {
maybe_start_logging();
let catalog = TestCatalog::new();
// infinite retention
let ns = catalog.create_namespace_with_retention("ns", None).await;
let table = ns.create_table("table").await;
let shard = ns.create_shard(1).await;
let partition1 = table.with_shard(&shard).create_partition("k1").await;
let partition2 = table.with_shard(&shard).create_partition("k2").await;
table.create_column("time", ColumnType::Time).await;
table.create_column("foo", ColumnType::F64).await;
// kept because max sequence number <= 2
let builder = TestParquetFileBuilder::default()
.with_line_protocol("table foo=1 11")
.with_max_seq(2);
let file1 = partition1.create_parquet_file(builder).await;
// pruned because min sequence number > 2
let builder = TestParquetFileBuilder::default()
.with_line_protocol("table foo=2 22")
.with_max_seq(3);
partition1.create_parquet_file(builder).await;
// kept because max sequence number <= 3
let builder = TestParquetFileBuilder::default()
.with_line_protocol("table foo=1 11")
.with_max_seq(3);
let file2 = partition2.create_parquet_file(builder).await;
// pruned because min sequence number > 3
let builder = TestParquetFileBuilder::default()
.with_line_protocol("table foo=2 22")
.with_max_seq(4);
partition2.create_parquet_file(builder).await;
// partition1: kept because sequence number <= 10
// partition2: kept because sequence number <= 11
table
.with_shard(&shard)
.create_tombstone(10, 1, 100, "foo=1")
.await;
// partition1: pruned because sequence number > 10
// partition2: kept because sequence number <= 11
table
.with_shard(&shard)
.create_tombstone(11, 1, 100, "foo=2")
.await;
// partition1: pruned because sequence number > 10
// partition2: pruned because sequence number > 11
table
.with_shard(&shard)
.create_tombstone(12, 1, 100, "foo=3")
.await;
let schema = SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap();
let ingester_chunk_id1 = u128::MAX - 1;
let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2);
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(
// this chunk is kept
builder1
.with_ingester_chunk_id(ingester_chunk_id1)
.with_lp(["table foo=3i 33"])
.build(
// parquet max persisted sequence number
Some(SequenceNumber::new(2)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(10)),
),
)
.with_ingester_partition(
// this chunk is filtered out because it has no record batches but the reconciling
// still takes place
builder2.with_ingester_chunk_id(u128::MAX).build(
// parquet max persisted sequence number
Some(SequenceNumber::new(3)),
// tombstone max persisted sequence number
Some(SequenceNumber::new(11)),
),
);
let mut chunks = querier_table.chunks().await.unwrap();
chunks.sort_by_key(|c| c.id());
// three chunks (two parquet files and one for the in-mem ingester data)
assert_eq!(chunks.len(), 3);
// check IDs
assert_eq!(
chunks[0].id(),
ChunkId::new_test(file1.parquet_file.id.get() as u128),
);
assert_eq!(
chunks[1].id(),
ChunkId::new_test(file2.parquet_file.id.get() as u128),
);
assert_eq!(chunks[2].id(), ChunkId::new_test(ingester_chunk_id1));
// check types
assert_eq!(chunks[0].chunk_type(), "parquet");
assert_eq!(chunks[1].chunk_type(), "parquet");
assert_eq!(chunks[2].chunk_type(), "IngesterPartition");
// check delete predicates
// parquet chunks have predicate attached
assert_eq!(chunks[0].delete_predicates().len(), 1);
assert_eq!(chunks[1].delete_predicates().len(), 2);
// ingester in-mem chunk doesn't need predicates, because the ingester has already
// materialized them for us
assert_eq!(chunks[2].delete_predicates().len(), 0);
// check spans
let root_span = querier_table
.traces
.spans()
.into_iter()
.find(|s| s.name == "root")
.expect("root span not found");
assert_eq!(root_span.status, SpanStatus::Ok);
let ip_span = querier_table
.traces
.spans()
.into_iter()
.find(|s| s.name == "ingester partitions")
.expect("ingester partitions span not found");
assert_eq!(ip_span.status, SpanStatus::Ok);
}
#[tokio::test]
async fn test_ingester_overlap_detection() {
maybe_start_logging();
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
let shard = ns.create_shard(1).await;
let partition1 = table.with_shard(&shard).create_partition("k1").await;
let partition2 = table.with_shard(&shard).create_partition("k2").await;
let schema = SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap();
let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2);
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(
builder1
.clone()
.with_ingester_chunk_id(1)
.with_lp(vec!["table foo=1i 1"])
.build(
// parquet max persisted sequence number
None, // tombstone max persisted sequence number
None,
),
)
.with_ingester_partition(
builder2
.with_ingester_chunk_id(2)
.with_lp(vec!["table foo=2i 2"])
.build(
// parquet max persisted sequence number
None, // tombstone max persisted sequence number
None,
),
)
.with_ingester_partition(
builder1
.with_ingester_chunk_id(3)
.with_lp(vec!["table foo=3i 3"])
.build(
// parquet max persisted sequence number
None, // tombstone max persisted sequence number
None,
),
);
let err = querier_table.chunks().await.unwrap_err();
assert_matches!(err, Error::IngestersOverlap { .. });
}
#[tokio::test]
async fn test_parquet_cache_refresh() {
maybe_start_logging();
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table1").await;
let shard = ns.create_shard(1).await;
let partition = table.with_shard(&shard).create_partition("k").await;
let schema = make_schema(&table).await;
let builder =
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// Parquet file between with max sequence number 2
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 11")
.with_max_seq(2);
partition.create_parquet_file(pf_builder).await;
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2)));
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(ingester_partition);
// Expect 2 chunks: one for ingester, and one from parquet file
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 2);
// Now, make a second chunk with max sequence number 3
let pf_builder = TestParquetFileBuilder::default()
.with_line_protocol("table1 foo=1 22")
.with_max_seq(3);
partition.create_parquet_file(pf_builder).await;
// With the same ingester response, still expect 2 chunks: one
// for ingester, and one from parquet file
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 2);
// update the ingester response to return a new max parquet
// sequence number that includes the new file (3)
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3)));
let querier_table = querier_table
.clear_ingester_partitions()
.with_ingester_partition(ingester_partition);
// expect the second file is found, resulting in three chunks
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 3);
}
#[tokio::test]
async fn test_tombstone_cache_refresh() {
maybe_start_logging();

View File

@ -2,7 +2,7 @@
mod interface;
use data_types::{CompactionLevel, DeletePredicate, PartitionId, ShardId, Tombstone, TombstoneId};
use data_types::{DeletePredicate, PartitionId, ShardId, Tombstone, TombstoneId};
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use schema::sort::SortKey;
@ -18,7 +18,7 @@ use crate::{
tombstone::QuerierTombstone, IngesterPartition,
};
use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo};
use self::interface::{IngesterPartitionInfo, TombstoneInfo};
#[derive(Snafu, Debug)]
#[allow(missing_copy_implementations)]
@ -33,9 +33,6 @@ pub struct Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
/// Whether the querier is running in RPC write mode. This can be removed when the switch to
/// the RPC write design is complete.
rpc_write: bool,
}
impl Reconciler {
@ -43,13 +40,11 @@ impl Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
catalog_cache: Arc<CatalogCache>,
rpc_write: bool,
) -> Self {
Self {
table_name,
namespace_name,
catalog_cache,
rpc_write,
}
}
@ -118,19 +113,14 @@ impl Reconciler {
.push(tombstone);
}
// Do not filter based on max sequence number in RPC write mode because sequence numbers
// are no longer relevant
let parquet_files = if self.rpc_write {
parquet_files
} else {
filter_parquet_files(ingester_partitions, parquet_files)?
};
debug!(
namespace=%self.namespace_name(),
table_name=%self.table_name(),
n=parquet_files.len(),
parquet_ids=?parquet_files.iter().map(|f| f.meta().parquet_file_id().get()).collect::<Vec<_>>(),
parquet_ids=?parquet_files
.iter()
.map(|f| f.meta().parquet_file_id().get())
.collect::<Vec<_>>(),
"Parquet files after filtering"
);
@ -331,86 +321,6 @@ impl UpdatableQuerierChunk for IngesterChunk {
}
}
/// Filter out parquet files that contain "too new" data.
///
/// The caller may only use the returned parquet files.
///
/// This will remove files that are part of the catalog but that contain data that the ingester
/// persisted AFTER the querier contacted it. See module-level documentation about the order in
/// which the communication and the information processing should take place.
///
/// Note that the querier (and this method) do NOT care about the actual age of the parquet files,
/// since the compactor is free to to process files at any given moment (e.g. to combine them or to
/// materialize tombstones). However if the compactor combines files in a way that the querier
/// would need to split it into "desired" data and "too new" data then we will currently bail out
/// with [`ReconcileError`].
fn filter_parquet_files<I, P>(
ingester_partitions: &[I],
parquet_files: Vec<P>,
) -> Result<Vec<P>, ReconcileError>
where
I: IngesterPartitionInfo,
P: ParquetFileInfo,
{
// Build partition-based lookup table.
//
// Note that we don't need to take the shard ID into account here because each partition is
// not only bound to a table but also to a shard.
let lookup_table: HashMap<PartitionId, &I> = ingester_partitions
.iter()
.map(|i| (i.partition_id(), i))
.collect();
// we assume that we filter out a minimal amount of files, so we can use the same capacity
let mut result = Vec::with_capacity(parquet_files.len());
for file in parquet_files {
if let Some(ingester_partition) = lookup_table.get(&file.partition_id()) {
if let Some(persisted_max) = ingester_partition.parquet_max_sequence_number() {
debug!(
file_partition_id=%file.partition_id(),
file_max_seq_num=%file.max_sequence_number().get(),
persisted_max=%persisted_max.get(),
"Comparing parquet file and ingester parquet max"
);
// This is the result of the compactor compacting files persisted by the ingester after persisted_max
// The compacted result may include data of before and after persisted_max which prevents
// this query to return correct result because it only needs data before persist_max
if file.compaction_level() != CompactionLevel::Initial
&& file.max_sequence_number() > persisted_max
{
return Err(ReconcileError::CompactorConflict);
}
if file.max_sequence_number() > persisted_max {
// filter out, file is newer
continue;
}
} else {
debug!(
file_partition_id=%file.partition_id(),
file_max_seq_num=%file.max_sequence_number().get(),
"ingester thinks it doesn't have data persisted yet"
);
// ingester thinks it doesn't have any data persisted yet => can safely ignore file
continue;
}
} else {
debug!(
file_partition_id=%file.partition_id(),
file_max_seq_num=%file.max_sequence_number().get(),
"partition was not flagged by the ingester as unpersisted"
);
// partition was not flagged by the ingester as "unpersisted", so we can keep the
// parquet file
}
result.push(file);
}
Ok(result)
}
/// Generates "exclude" filter for tombstones.
///
/// Since tombstones are shard-wide but data persistence is partition-based (which are
@ -459,109 +369,9 @@ where
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use super::{interface::ParquetFileInfo, *};
use data_types::{CompactionLevel, SequenceNumber};
#[test]
fn test_filter_parquet_files_empty() {
let actual =
filter_parquet_files::<MockIngesterPartitionInfo, MockParquetFileInfo>(&[], vec![])
.unwrap();
assert_eq!(actual, vec![]);
}
#[test]
fn test_filter_parquet_files_compactor_conflict() {
let ingester_partitions = &[MockIngesterPartitionInfo {
partition_id: PartitionId::new(1),
shard_id: ShardId::new(1),
parquet_max_sequence_number: Some(SequenceNumber::new(10)),
tombstone_max_sequence_number: None,
}];
let parquet_files = vec![MockParquetFileInfo {
partition_id: PartitionId::new(1),
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
}];
let err = filter_parquet_files(ingester_partitions, parquet_files).unwrap_err();
assert_matches!(err, ReconcileError::CompactorConflict);
}
#[test]
fn test_filter_parquet_files_many() {
let ingester_partitions = &[
MockIngesterPartitionInfo {
partition_id: PartitionId::new(1),
shard_id: ShardId::new(1),
parquet_max_sequence_number: Some(SequenceNumber::new(10)),
tombstone_max_sequence_number: None,
},
MockIngesterPartitionInfo {
partition_id: PartitionId::new(2),
shard_id: ShardId::new(1),
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
MockIngesterPartitionInfo {
partition_id: PartitionId::new(3),
shard_id: ShardId::new(1),
parquet_max_sequence_number: Some(SequenceNumber::new(3)),
tombstone_max_sequence_number: None,
},
];
let pf11 = MockParquetFileInfo {
partition_id: PartitionId::new(1),
max_sequence_number: SequenceNumber::new(9),
compaction_level: CompactionLevel::Initial,
};
let pf12 = MockParquetFileInfo {
partition_id: PartitionId::new(1),
max_sequence_number: SequenceNumber::new(10),
compaction_level: CompactionLevel::Initial,
};
// filtered because it was persisted after ingester sent response (11 > 10)
let pf13 = MockParquetFileInfo {
partition_id: PartitionId::new(1),
max_sequence_number: SequenceNumber::new(20),
compaction_level: CompactionLevel::Initial,
};
let pf2 = MockParquetFileInfo {
partition_id: PartitionId::new(2),
max_sequence_number: SequenceNumber::new(0),
compaction_level: CompactionLevel::Initial,
};
let pf31 = MockParquetFileInfo {
partition_id: PartitionId::new(3),
max_sequence_number: SequenceNumber::new(3),
compaction_level: CompactionLevel::Initial,
};
// filtered because it was persisted after ingester sent response (4 > 3)
let pf32 = MockParquetFileInfo {
partition_id: PartitionId::new(3),
max_sequence_number: SequenceNumber::new(5),
compaction_level: CompactionLevel::Initial,
};
// passed because it came from a partition (4) the ingester didn't know about
let pf4 = MockParquetFileInfo {
partition_id: PartitionId::new(4),
max_sequence_number: SequenceNumber::new(0),
compaction_level: CompactionLevel::Initial,
};
let parquet_files = vec![
pf11.clone(),
pf12.clone(),
pf13,
pf2,
pf31.clone(),
pf32,
pf4.clone(),
];
let actual = filter_parquet_files(ingester_partitions, parquet_files).unwrap();
let expected = vec![pf11, pf12, pf31, pf4];
assert_eq!(actual, expected);
}
#[test]
fn test_filter_tombstones_empty() {
let actual =

View File

@ -4,12 +4,11 @@ use crate::{
IngesterPartition,
};
use arrow::record_batch::RecordBatch;
use data_types::{ChunkId, SequenceNumber, ShardIndex};
use data_types::{ChunkId, SequenceNumber};
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_tests::{TestCatalog, TestPartition, TestShard, TestTable};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{sort::SortKey, Projection, Schema};
use sharder::JumpHash;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Handle;
use uuid::Uuid;
@ -23,11 +22,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
catalog.object_store(),
&Handle::current(),
));
let chunk_adapter = Arc::new(ChunkAdapter::new(
catalog_cache,
catalog.metric_registry(),
true,
));
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, catalog.metric_registry()));
let mut repos = catalog.catalog.repositories().await;
let mut catalog_schema = get_schema_by_name(
@ -48,9 +43,6 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
.retention_period_ns
.map(|retention| Duration::from_nanos(retention as u64));
QuerierTable::new(QuerierTableArgs {
sharder: Some(Arc::new(JumpHash::new(
(0..1).map(ShardIndex::new).map(Arc::new),
))),
namespace_id: table.namespace.namespace.id,
namespace_name,
namespace_retention_period,
@ -74,7 +66,6 @@ pub(crate) struct IngesterPartitionBuilder {
schema: Schema,
shard: Arc<TestShard>,
partition: Arc<TestPartition>,
ingester_name: Arc<str>,
ingester_chunk_id: u128,
partition_sort_key: Option<Arc<SortKey>>,
@ -93,19 +84,12 @@ impl IngesterPartitionBuilder {
schema,
shard: Arc::clone(shard),
partition: Arc::clone(partition),
ingester_name: Arc::from("ingester1"),
partition_sort_key: None,
ingester_chunk_id: 1,
lp: Vec::new(),
}
}
/// set the partition chunk id to use when creating partitons
pub(crate) fn with_ingester_chunk_id(mut self, ingester_chunk_id: u128) -> Self {
self.ingester_chunk_id = ingester_chunk_id;
self
}
/// Set the line protocol that will be present in this partition
/// with an interator of `AsRef<str>`s
pub(crate) fn with_lp(mut self, lp: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
@ -132,7 +116,6 @@ impl IngesterPartitionBuilder {
let data = self.lp.iter().map(|lp| lp_to_record_batch(lp)).collect();
IngesterPartition::new(
Arc::clone(&self.ingester_name),
Some(Uuid::new_v4()),
self.partition.partition.id,
self.shard.shard.id,