chore: router namespace api (#6151)

* chore: move ns api from querier to router

* chore: add explanatory comment in querier about moved namespace API

* fix: add namespace service to router

* fix: querier returns unimplemented error for ns retention, not panic

* chore: reuse namespace -> proto in router ns api

* chore: grpc namespace - consume ns to avoid clone

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Luke Bond 2022-11-16 15:25:49 +00:00 committed by GitHub
parent fbe9f27f10
commit 9365d933f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 174 additions and 74 deletions

16
Cargo.lock generated
View File

@ -4335,6 +4335,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"service_grpc_catalog",
"service_grpc_namespace",
"service_grpc_object_store",
"service_grpc_schema",
"sharder",
@ -4658,6 +4659,21 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "service_grpc_namespace"
version = "0.1.0"
dependencies = [
"data_types",
"generated_types",
"iox_catalog",
"iox_tests",
"metric",
"observability_deps",
"tokio",
"tonic",
"workspace-hack",
]
[[package]]
name = "service_grpc_object_store"
version = "0.1.0"

View File

@ -59,6 +59,7 @@ members = [
"service_common",
"service_grpc_influxrpc",
"service_grpc_flight",
"service_grpc_namespace",
"service_grpc_object_store",
"service_grpc_catalog",
"service_grpc_schema",

View File

@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/namespace/v1";
service NamespaceService {
// Get all namespaces
rpc GetNamespaces(GetNamespacesRequest) returns (GetNamespacesResponse);
// Update retention period
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse);
}
message GetNamespacesRequest {
@ -14,10 +17,25 @@ message GetNamespacesResponse {
repeated Namespace namespaces = 1;
}
message UpdateNamespaceRetentionRequest {
// Name of the namespace to be set
string name = 1;
// Number of hours of the retention period
int64 retention_hours = 2;
}
message UpdateNamespaceRetentionResponse {
Namespace namespace = 1;
}
message Namespace {
// Namespace ID
int64 id = 1;
// Name of the Namespace
string name = 2;
// Retention period ns
optional int64 retention_period_ns = 3;
}

View File

@ -5,9 +5,6 @@ option go_package = "github.com/influxdata/iox/schema/v1";
service SchemaService {
// Get the schema for a namespace
rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
// update retention period
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse);
}
message GetSchemaRequest {
@ -63,26 +60,3 @@ message ColumnSchema {
COLUMN_TYPE_TAG = 7;
}
}
message UpdateNamespaceRetentionRequest {
// Name of the namespace to be set
string name = 1;
// Number of hours of the retention period
int64 retention_hours = 2;
}
message UpdateNamespaceRetentionResponse {
Namespace namespace = 1;
}
message Namespace {
// Namespace ID
int64 id = 1;
// Name of the Namespace
string name = 2;
// Retention period ns
optional int64 retention_period_ns = 3;
}

View File

@ -33,7 +33,7 @@ pub async fn command(
retention_hours,
} = config;
let mut client = influxdb_iox_client::schema::Client::new(connection);
let mut client = influxdb_iox_client::namespace::Client::new(connection);
let namespace = client
.update_namespace_retention(&namespace, retention_hours.try_into().unwrap())
.await?;

View File

@ -3,6 +3,7 @@ use client_util::connection::GrpcConnection;
use self::generated_types::{namespace_service_client::NamespaceServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
use ::generated_types::google::OptionalField;
/// Re-export generated_types
pub mod generated_types {
@ -29,4 +30,21 @@ impl Client {
Ok(response.into_inner().namespaces)
}
/// Update retention for a namespace
pub async fn update_namespace_retention(
&mut self,
namespace: &str,
retention_hours: i64,
) -> Result<Namespace, Error> {
let response = self
.inner
.update_namespace_retention(UpdateNamespaceRetentionRequest {
name: namespace.to_string(),
retention_hours,
})
.await?;
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
}
}

View File

@ -35,21 +35,4 @@ impl Client {
Ok(response.into_inner().schema.unwrap_field("schema")?)
}
/// Update retention for a namespace
pub async fn update_namespace_retention(
&mut self,
namespace: &str,
retention_hours: i64,
) -> Result<Namespace, Error> {
let response = self
.inner
.update_namespace_retention(UpdateNamespaceRetentionRequest {
name: namespace.to_string(),
retention_hours,
})
.await?;
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
}
}

View File

@ -1,4 +1,9 @@
//! NamespaceService gRPC implementation
//!
//! NOTE: this is present here in the querier to support a debug use-case that is handy in
//! production, namely `kubectl exec`ing into the querier pod and using the REPL. the namespace API
//! belongs in the router and has been moved there, but this is kept here in partial form to
//! support `show namespaces` in the REPL.
use data_types::Namespace;
use generated_types::influxdata::iox::namespace::v1 as proto;
@ -30,6 +35,7 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace {
proto::Namespace {
id: namespace.id.get(),
name: namespace.name,
retention_period_ns: namespace.retention_period_ns,
}
}
@ -49,6 +55,15 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
namespaces,
}))
}
async fn update_namespace_retention(
&self,
_request: tonic::Request<proto::UpdateNamespaceRetentionRequest>,
) -> Result<tonic::Response<proto::UpdateNamespaceRetentionResponse>, tonic::Status> {
Err(tonic::Status::unimplemented(
"use router instances to manage namespaces",
))
}
}
#[cfg(test)]
@ -134,10 +149,12 @@ mod tests {
proto::Namespace {
id: 1,
name: "namespace2".to_string(),
retention_period_ns: None,
},
proto::Namespace {
id: 2,
name: "namespace1".to_string(),
retention_period_ns: None,
},
]
}

View File

@ -129,6 +129,7 @@ where
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service());
add_service!(builder, self.server.grpc().shard_service());
add_service!(builder, self.server.grpc().namespace_service());
serve_builder!(builder);
Ok(())

View File

@ -30,6 +30,7 @@ serde = "1.0"
serde_json = "1.0.87"
serde_urlencoded = "0.7"
service_grpc_catalog = { path = "../service_grpc_catalog"}
service_grpc_namespace = { path = "../service_grpc_namespace"}
service_grpc_schema = { path = "../service_grpc_schema" }
service_grpc_object_store = { path = "../service_grpc_object_store" }
sharder = { path = "../sharder" }

View File

@ -6,11 +6,12 @@ use std::sync::Arc;
use ::sharder::Sharder;
use generated_types::influxdata::iox::{
catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*,
catalog::v1::*, namespace::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*,
};
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use service_grpc_catalog::CatalogService;
use service_grpc_namespace::NamespaceService;
use service_grpc_object_store::ObjectStoreService;
use service_grpc_schema::SchemaService;
@ -87,4 +88,15 @@ where
) -> shard_service_server::ShardServiceServer<impl shard_service_server::ShardService> {
shard_service_server::ShardServiceServer::new(self.shard_service.clone())
}
/// Acquire a [`NamespaceService`] gRPC service implementation.
///
/// [`NamespaceService`]: generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService.
pub fn namespace_service(
&self,
) -> namespace_service_server::NamespaceServiceServer<NamespaceService> {
namespace_service_server::NamespaceServiceServer::new(NamespaceService::new(Arc::clone(
&self.catalog,
)))
}
}

View File

@ -0,0 +1,19 @@
[package]
name = "service_grpc_namespace"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
tonic = "0.8"
iox_catalog = { path = "../iox_catalog" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
iox_tests = { path = "../iox_tests" }
metric = { path = "../metric" }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

View File

@ -0,0 +1,68 @@
//! Implementation of the namespace gRPC service
use std::sync::Arc;
use data_types::Namespace as CatalogNamespace;
use generated_types::influxdata::iox::namespace::v1::*;
use iox_catalog::interface::Catalog;
use observability_deps::tracing::warn;
use tonic::{Request, Response, Status};
/// Implementation of the gRPC namespace service
#[derive(Debug)]
pub struct NamespaceService {
/// Catalog.
catalog: Arc<dyn Catalog>,
}
impl NamespaceService {
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self { catalog }
}
}
#[tonic::async_trait]
impl namespace_service_server::NamespaceService for NamespaceService {
async fn get_namespaces(
&self,
_request: Request<GetNamespacesRequest>,
) -> Result<Response<GetNamespacesResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let namespaces = repos.namespaces().list().await.map_err(|e| {
warn!(error=%e, "failed to retrieve namespaces from catalog");
Status::not_found(e.to_string())
})?;
Ok(Response::new(GetNamespacesResponse {
namespaces: namespaces.into_iter().map(namespace_to_proto).collect(),
}))
}
async fn update_namespace_retention(
&self,
request: Request<UpdateNamespaceRetentionRequest>,
) -> Result<Response<UpdateNamespaceRetentionResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let namespace = repos
.namespaces()
.update_retention_period(&req.name, req.retention_hours)
.await
.map_err(|e| {
warn!(error=%e, %req.name, "failed to update namespace retention");
Status::not_found(e.to_string())
})?;
Ok(Response::new(UpdateNamespaceRetentionResponse {
namespace: Some(namespace_to_proto(namespace)),
}))
}
}
fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace {
Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
}
}

View File

@ -3,7 +3,7 @@
use std::{ops::DerefMut, sync::Arc};
use generated_types::influxdata::iox::schema::v1::*;
use iox_catalog::interface::{get_schema_by_name, update_namespace_retention, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use observability_deps::tracing::warn;
use tonic::{Request, Response, Status};
@ -38,24 +38,6 @@ impl schema_service_server::SchemaService for SchemaService {
.map(Arc::new)?;
Ok(Response::new(schema_to_proto(schema)))
}
async fn update_namespace_retention(
&self,
request: Request<UpdateNamespaceRetentionRequest>,
) -> Result<Response<UpdateNamespaceRetentionResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let namespace =
update_namespace_retention(&req.name, req.retention_hours, repos.deref_mut())
.await
.map_err(|e| {
warn!(error=%e, %req.name, "failed to update namespace retention");
Status::not_found(e.to_string())
})
.map(Arc::new)?;
Ok(Response::new(namespace_to_proto(namespace)))
}
}
fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaResponse {
@ -94,16 +76,6 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
response
}
fn namespace_to_proto(namespace: Arc<data_types::Namespace>) -> UpdateNamespaceRetentionResponse {
UpdateNamespaceRetentionResponse {
namespace: Some(Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;