fix: Do not specify a querier address by default (#4289)
parent
17e11d6139
commit
bd4566cbe0
|
@ -18,16 +18,21 @@ pub struct QuerierConfig {
|
|||
num_query_threads: Option<usize>,
|
||||
|
||||
/// gRPC address for the querier to talk with the ingester. For
|
||||
/// example "http://127.0.0.1:8083" or
|
||||
/// "http://10.10.10.1:8083,http://10.10.10.2:8083" for multiple
|
||||
/// addresses.
|
||||
/// example:
|
||||
///
|
||||
/// "http://127.0.0.1:8083"
|
||||
///
|
||||
/// or
|
||||
///
|
||||
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
|
||||
///
|
||||
/// for multiple addresses.
|
||||
///
|
||||
/// Note we plan to improve this interface in
|
||||
/// <https://github.com/influxdata/influxdb_iox/issues/3996>
|
||||
#[clap(
|
||||
long = "--ingester-address",
|
||||
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
|
||||
default_value = "http://127.0.0.1:8083",
|
||||
multiple_values = true,
|
||||
use_value_delimiter = true
|
||||
)]
|
||||
|
@ -70,10 +75,7 @@ mod tests {
|
|||
let actual = QuerierConfig::try_parse_from(["my_binary"]).unwrap();
|
||||
|
||||
assert_eq!(actual.num_query_threads(), None);
|
||||
assert_eq!(
|
||||
actual.ingester_addresses().unwrap(),
|
||||
&["http://127.0.0.1:8083".to_string()]
|
||||
);
|
||||
assert!(actual.ingester_addresses().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -82,10 +84,7 @@ mod tests {
|
|||
QuerierConfig::try_parse_from(["my_binary", "--num-query-threads", "42"]).unwrap();
|
||||
|
||||
assert_eq!(actual.num_query_threads(), Some(42));
|
||||
assert_eq!(
|
||||
actual.ingester_addresses().unwrap(),
|
||||
&["http://127.0.0.1:8083".to_string()]
|
||||
);
|
||||
assert!(actual.ingester_addresses().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -104,3 +104,53 @@ async fn basic_on_parquet() {
|
|||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn basic_no_ingster_connection() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
// fast parquet
|
||||
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
|
||||
|
||||
// specially create a querier config that is NOT connected to the ingester
|
||||
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
.await
|
||||
.with_querier(querier_config)
|
||||
.await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for data to be persisted to parquet
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
// run query
|
||||
let sql = format!("select * from {}", table_name);
|
||||
let batches = run_query(
|
||||
sql,
|
||||
cluster.namespace(),
|
||||
cluster.querier().querier_grpc_connection(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
|
|
@ -72,12 +72,18 @@ impl TestConfig {
|
|||
let ingester_address =
|
||||
Arc::clone(&ingester_config.addrs().ingester_grpc_api().client_base());
|
||||
|
||||
Self::new(ServerType::Querier, ingester_config.dsn())
|
||||
.with_existing_object_store(ingester_config)
|
||||
Self::new_querier_without_ingester(ingester_config)
|
||||
// Configure to talk with the ingester
|
||||
.with_ingester_addresses(&[ingester_address.as_ref()])
|
||||
}
|
||||
|
||||
/// Create a minimal querier configuration from the specified
|
||||
/// ingester configuration, using the same dsn and object store
|
||||
pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self {
|
||||
Self::new(ServerType::Querier, ingester_config.dsn())
|
||||
.with_existing_object_store(ingester_config)
|
||||
}
|
||||
|
||||
/// Create a minimal all in one configuration
|
||||
pub fn new_all_in_one(dsn: impl Into<String>) -> Self {
|
||||
Self::new(ServerType::AllInOne, dsn)
|
||||
|
|
Loading…
Reference in New Issue