feat: add object store service to router (#4338)

Add method to catalog to get parquet file by object store id.
Add gRPC service for object store to get a file from by its uuid.
Add the object store service to router2 with object store config.
pull/24376/head
Paul Dix 2022-04-16 13:58:31 -04:00 committed by GitHub
parent 72a2a68eea
commit 5bf4550259
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 386 additions and 2 deletions

27
Cargo.lock generated
View File

@ -2611,6 +2611,31 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "iox_object_store_service"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"data_types2",
"futures",
"generated_types",
"iox_catalog",
"iox_object_store",
"metric",
"object_store",
"observability_deps",
"serde",
"serde_urlencoded",
"time 0.1.0",
"tokio",
"tokio-stream",
"tonic 0.6.2",
"trace",
"uuid",
"workspace-hack",
]
[[package]]
name = "iox_tests"
version = "0.1.0"
@ -4974,11 +4999,13 @@ dependencies = [
"influxdb_line_protocol",
"iox_catalog",
"iox_catalog_service",
"iox_object_store_service",
"lazy_static",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"paste",

View File

@ -30,6 +30,7 @@ members = [
"iox_data_generator",
"iox_gitops_adapter",
"iox_object_store",
"iox_object_store_service",
"iox_tests",
"ioxd_common",
"ioxd_compactor",

View File

@ -39,6 +39,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let ingester_path = root.join("influxdata/iox/ingester/v1");
let management_path = root.join("influxdata/iox/management/v1");
let namespace_path = root.join("influxdata/iox/namespace/v1");
let object_store_path = root.join("influxdata/iox/object_store/v1");
let predicate_path = root.join("influxdata/iox/predicate/v1");
let preserved_catalog_path = root.join("influxdata/iox/preserved_catalog/v1");
let querier_path = root.join("influxdata/iox/querier/v1");
@ -65,6 +66,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
management_path.join("server_config.proto"),
management_path.join("service.proto"),
namespace_path.join("service.proto"),
object_store_path.join("service.proto"),
predicate_path.join("predicate.proto"),
preserved_catalog_path.join("catalog.proto"),
preserved_catalog_path.join("parquet_metadata.proto"),

View File

@ -0,0 +1,17 @@
syntax = "proto3";
package influxdata.iox.object_store.v1;
service ObjectStoreService {
// Get the parquet file from the object store by its uuid
rpc GetParquetFileByObjectStoreId(GetParquetFileByObjectStoreIdRequest) returns (stream GetParquetFileByObjectStoreIdResponse);
}
message GetParquetFileByObjectStoreIdRequest {
// the parquet file object store uuid
string uuid = 1;
}
message GetParquetFileByObjectStoreIdResponse {
// bytes from the parquet file in object store
bytes data = 1;
}

View File

@ -101,6 +101,19 @@ pub mod influxdata {
}
}
pub mod object_store {
pub mod v1 {
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.object_store.v1.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.object_store.v1.serde.rs"
));
}
}
pub mod predicate {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.predicate.v1.rs"));

View File

@ -336,6 +336,7 @@ pub async fn command(config: Config) -> Result<()> {
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),
Arc::clone(&object_store),
&write_buffer_config,
query_pool_name,
)

View File

@ -9,6 +9,7 @@ use clap_blocks::{
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_router2::create_router2_server_type;
use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use thiserror::Error;
@ -22,6 +23,9 @@ pub enum Error {
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("Creating router: {0}")]
Router(#[from] ioxd_router2::Error),
@ -74,10 +78,17 @@ pub async fn command(config: Config) -> Result<()> {
.get_catalog("router2", Arc::clone(&metrics))
.await?;
let object_store = ObjectStoreImpl::try_from(config.run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?;
// Decorate the object store with a metric recorder.
let object_store: Arc<DynObjectStore> =
Arc::new(ObjectStoreMetrics::new(object_store, &*metrics));
let server_type = create_router2_server_type(
&common_state,
Arc::clone(&metrics),
catalog,
object_store,
&config.write_buffer_config,
&config.query_pool_name,
)

View File

@ -591,6 +591,12 @@ pub trait ParquetFileRepo: Send + Sync {
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64>;
/// Return the parquet file with the given object store id
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,
) -> Result<Option<ParquetFile>>;
}
/// Functions for working with processed tombstone pointers in the catalog
@ -1756,6 +1762,14 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// verify we can get it by its object store id
let pfg = repos
.parquet_files()
.get_by_object_store_id(parquet_file.object_store_id)
.await
.unwrap();
assert_eq!(parquet_file, pfg.unwrap());
let metadata = repos
.parquet_files()
.parquet_metadata(parquet_file.id)

View File

@ -18,6 +18,7 @@ use data_types2::{
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::warn;
use sqlx::types::Uuid;
use std::{
collections::{BTreeMap, HashSet},
convert::TryFrom,
@ -1225,6 +1226,19 @@ impl ParquetFileRepo for MemTxn {
i64::try_from(count).map_err(|_| Error::InvalidValue { value: count })
}
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,
) -> Result<Option<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.find(|f| f.object_store_id.eq(&object_store_id))
.cloned())
}
}
#[async_trait]

View File

@ -15,6 +15,7 @@ use data_types2::{
use metric::{Metric, U64Histogram, U64HistogramOptions};
use std::{fmt::Debug, sync::Arc};
use time::{SystemProvider, TimeProvider};
use uuid::Uuid;
/// Decorates a implementation of the catalog's [`RepoCollection`] (and the
/// transactional variant) with instrumentation that emits latency histograms
@ -280,6 +281,7 @@ decorate!(
"parquet_metadata" = parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>>;
"parquet_count" = count(&mut self) -> Result<i64>;
"parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
]
);

View File

@ -17,6 +17,7 @@ use data_types2::{
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::{info, warn};
use sqlx::types::Uuid;
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
use sqlx_hotswap_pool::HotSwapPool;
use std::{collections::HashMap, sync::Arc, time::Duration};
@ -1781,6 +1782,34 @@ WHERE table_id = $1
Ok(read_result.count)
}
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,
) -> Result<Option<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
let rec = sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE object_store_id = $1;
"#,
)
.bind(&object_store_id) // $1
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let parquet_file = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(parquet_file))
}
}
#[async_trait]

View File

@ -0,0 +1,29 @@
[package]
name = "iox_object_store_service"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
data_types2 = { path = "../data_types2" }
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
serde = "1.0"
serde_urlencoded = "0.7"
time = { path = "../time" }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1"
tonic = "0.6"
trace = { path = "../trace/" }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
bytes = "1.0"
metric = { path = "../metric" }

View File

@ -0,0 +1,195 @@
//! gRPC service for getting files from the object store a remote IOx service is connected to. Used
//! in router2, but can be included in any gRPC server.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use futures::stream::BoxStream;
use futures::StreamExt;
use generated_types::influxdata::iox::object_store::v1::*;
use iox_catalog::interface::Catalog;
use iox_object_store::ParquetFilePath;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use std::sync::Arc;
use tonic::{Request, Response, Status};
use uuid::Uuid;
/// Implementation of the ObjectStore gRPC service
#[derive(Debug)]
pub struct ObjectStoreService {
/// Catalog
catalog: Arc<dyn Catalog>,
/// The object store
object_store: Arc<DynObjectStore>,
}
impl ObjectStoreService {
/// Create a new object store service with the given catalog and object store
pub fn new(catalog: Arc<dyn Catalog>, object_store: Arc<DynObjectStore>) -> Self {
Self {
catalog,
object_store,
}
}
}
#[tonic::async_trait]
impl object_store_service_server::ObjectStoreService for ObjectStoreService {
type GetParquetFileByObjectStoreIdStream =
BoxStream<'static, Result<GetParquetFileByObjectStoreIdResponse, Status>>;
async fn get_parquet_file_by_object_store_id(
&self,
request: Request<GetParquetFileByObjectStoreIdRequest>,
) -> Result<Response<Self::GetParquetFileByObjectStoreIdStream>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let object_store_id =
Uuid::parse_str(&req.uuid).map_err(|e| Status::invalid_argument(e.to_string()))?;
let parquet_file = repos
.parquet_files()
.get_by_object_store_id(object_store_id)
.await
.map_err(|e| {
warn!(error=%e, %req.uuid, "failed to get parquet_file by object store id");
Status::unknown(e.to_string())
})?
.ok_or_else(|| Status::not_found(req.uuid))?;
let path = ParquetFilePath::new_new_gen(
parquet_file.namespace_id,
parquet_file.table_id,
parquet_file.sequencer_id,
parquet_file.partition_id,
parquet_file.object_store_id,
)
.absolute_dirs_and_file_name();
let path = self.object_store.path_from_dirs_and_filename(path);
let res = self
.object_store
.get(&path)
.await
.map_err(|e| Status::unknown(e.to_string()))?;
let rx = Box::pin(res.into_stream().map(|next| match next {
Ok(data) => Ok(GetParquetFileByObjectStoreIdResponse {
data: data.to_vec(),
}),
Err(e) => Err(Status::unknown(e.to_string())),
}));
Ok(Response::new(rx))
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use data_types2::{KafkaPartition, ParquetFileParams, SequenceNumber, Timestamp};
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
use iox_catalog::mem::MemCatalog;
use object_store::{ObjectStoreApi, ObjectStoreImpl};
use uuid::Uuid;
#[tokio::test]
async fn test_get_parquet_file_by_object_store_id() {
// create a catalog and populate it with some test data, then drop the write lock
let p1;
let catalog = {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
let kafka = repos
.kafka_topics()
.create_or_get("iox_shared")
.await
.unwrap();
let pool = repos
.query_pools()
.create_or_get("iox_shared")
.await
.unwrap();
let sequencer = repos
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let namespace = repos
.namespaces()
.create("catalog_partition_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("schema_test_table", namespace.id)
.await
.unwrap();
let partition = repos
.partitions()
.create_or_get("foo", sequencer.id, table.id)
.await
.unwrap();
let p1params = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(1),
max_sequence_number: SequenceNumber::new(40),
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 2343,
parquet_metadata: vec![],
row_count: 29,
compaction_level: 0,
created_at: Timestamp::new(2343),
};
p1 = repos.parquet_files().create(p1params).await.unwrap();
Arc::clone(&catalog)
};
let object_store = Arc::new(ObjectStoreImpl::new_in_memory());
let path = ParquetFilePath::new_new_gen(
p1.namespace_id,
p1.table_id,
p1.sequencer_id,
p1.partition_id,
p1.object_store_id,
)
.absolute_dirs_and_file_name();
let path = object_store.path_from_dirs_and_filename(path);
let data = Bytes::from_static(b"some data");
object_store.put(&path, data.clone()).await.unwrap();
let grpc = super::ObjectStoreService::new(catalog, object_store);
let request = GetParquetFileByObjectStoreIdRequest {
uuid: p1.object_store_id.to_string(),
};
let tonic_response = grpc
.get_parquet_file_by_object_store_id(Request::new(request))
.await
.expect("rpc request should succeed");
let mut response = tonic_response.into_inner();
let response = response.next().await.unwrap().unwrap();
assert_eq!(response.data, data);
}
}

View File

@ -37,6 +37,7 @@ use ioxd_common::{
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use object_store::DynObjectStore;
#[derive(Debug, Error)]
pub enum Error {
@ -107,6 +108,7 @@ where
add_service!(builder, self.server.grpc().write_service());
add_service!(builder, self.server.grpc().schema_service());
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service());
serve_builder!(builder);
Ok(())
@ -146,6 +148,7 @@ pub async fn create_router2_server_type(
common_state: &CommonServerState,
metrics: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
write_buffer_config: &WriteBufferConfig,
query_pool_name: &str,
) -> Result<Arc<dyn ServerType>> {
@ -265,7 +268,12 @@ pub async fn create_router2_server_type(
Arc::clone(&handler_stack),
&metrics,
);
let grpc = GrpcDelegate::new(handler_stack, schema_catalog, Arc::clone(&metrics));
let grpc = GrpcDelegate::new(
handler_stack,
schema_catalog,
object_store,
Arc::clone(&metrics),
);
let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector());
let server_type = Arc::new(RouterServerType::new(router_server, common_state));

View File

@ -18,10 +18,12 @@ hyper = "0.14"
influxdb_line_protocol = { version = "0.1.0", path = "../influxdb_line_protocol" }
iox_catalog = { path = "../iox_catalog" }
iox_catalog_service = { path = "../iox_catalog_service"}
iox_object_store_service = { path = "../iox_object_store_service" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
predicate = { path = "../predicate" }

View File

@ -5,15 +5,17 @@ use std::sync::Arc;
use generated_types::{
google::FieldViolation,
influxdata::{
iox::{catalog::v1::*, schema::v1::*},
iox::{catalog::v1::*, object_store::v1::*, schema::v1::*},
pbdata::v1::*,
},
};
use hashbrown::HashMap;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog_service::CatalogService;
use iox_object_store_service::ObjectStoreService;
use metric::U64Counter;
use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use schema::selection::Selection;
use std::ops::DerefMut;
@ -29,6 +31,7 @@ use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
pub struct GrpcDelegate<D> {
dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
}
@ -38,11 +41,13 @@ impl<D> GrpcDelegate<D> {
pub fn new(
dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
) -> Self {
Self {
dml_handler,
catalog,
object_store,
metrics,
}
}
@ -86,6 +91,20 @@ where
&self.catalog,
)))
}
/// Acquire a [`ObjectStoreService`] gRPC service implementation.
///
/// [`ObjectStoreService`]: generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService.
pub fn object_store_service(
&self,
) -> object_store_service_server::ObjectStoreServiceServer<
impl object_store_service_server::ObjectStoreService,
> {
object_store_service_server::ObjectStoreServiceServer::new(ObjectStoreService::new(
Arc::clone(&self.catalog),
Arc::clone(&self.object_store),
))
}
}
#[derive(Debug)]