From 1df7a0d4fb2bc2ae38765e8c28092602824b5bf7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 Oct 2022 14:39:05 -0400 Subject: [PATCH] refactor: remove outdated observer sql repl mode (#5918) * refactor: remove Observer mode from repl * chore: remove outdated SQL docs * fix: more update of sql docs --- docs/sql.md | 133 +------- influxdb_iox/src/commands/sql.rs | 1 - influxdb_iox/src/commands/sql/observer.rs | 296 ------------------ influxdb_iox/src/commands/sql/repl.rs | 36 +-- influxdb_iox/src/commands/sql/repl_command.rs | 33 -- 5 files changed, 5 insertions(+), 494 deletions(-) delete mode 100644 influxdb_iox/src/commands/sql/observer.rs diff --git a/docs/sql.md b/docs/sql.md index 278e75c4fe..977da2f6ec 100644 --- a/docs/sql.md +++ b/docs/sql.md @@ -32,7 +32,7 @@ Then run the `USE ` command ``` > use 810c5937734635d8_dbce66e3a6cbe757; You are now in remote mode, querying namespace 810c5937734635d8_dbce66e3a6cbe757 -810c5937734635d8_dbce66e3a6cbe757> +810c5937734635d8_dbce66e3a6cbe757> ``` Now, all queries will be run against the specified namespace (`810c5937734635d8_dbce66e3a6cbe757`) in this example @@ -67,31 +67,6 @@ Returned 1 row in 74.022768ms ``` -## Observer -In this mode queries are run *locally* against a cached unified view of the remote system tables - -``` -810c5937734635d8_dbce66e3a6cbe757> observer -; -Preparing local views of remote system tables -Loading system tables from 3 databases -... - Completed in 112.085784ms -You are now in Observer mode. - -SQL commands in this mode run against a cached unified view of -remote system tables in all remote databases. - -To see the unified tables available to you, try running -SHOW TABLES; - -To reload the most recent version of the database system tables, run -OBSERVER; - - -OBSERVER> -``` - # Query Cookbook This section contains some common and useful queries against IOx system tables @@ -108,9 +83,6 @@ my_db> show tables; | table_catalog | table_schema | table_name | table_type | +---------------+--------------------+-------------+------------+ | public | iox | query_count | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | operations | BASE TABLE | | public | information_schema | tables | VIEW | | public | information_schema | columns | VIEW | +---------------+--------------------+-------------+------------+ @@ -149,96 +121,6 @@ Query execution complete in 39.046225ms ``` - -## System Tables - -Here are some interesting reports you can run when in `OBSERVER` mode: - -### Total storage size taken by each database - -```sql -SELECT - database_name, count(*) as num_chunks, - sum(memory_bytes)/1024/1024 as estimated_mb -FROM chunks -GROUP BY database_name -ORDER BY estimated_mb desc -LIMIT 20; -``` - -### Total estimated storage size by database and storage class -```sql -SELECT - database_name, storage, count(*) as num_chunks, - sum(memory_bytes)/1024/1024 as estimated_mb -FROM chunks -GROUP BY database_name, storage -ORDER BY estimated_mb desc -LIMIT 20; -``` - -### Total estimated storage size by database, table_name and storage class - -```sql -SELECT - database_name, table_name, storage, count(*) as num_chunks, - sum(memory_bytes)/1024/1024 as estimated_mb -FROM chunks -GROUP BY database_name, table_name, storage -ORDER BY estimated_mb desc -LIMIT 20; -``` - - -### Total row count by table - -```sql -SELECT database_name, table_name, sum(total_rows) as total_rows -FROM ( - SELECT database_name, table_name, max(row_count) as total_rows - FROM chunk_columns - GROUP BY database_name, partition_key, table_name -) -GROUP BY database_name, table_name -ORDER BY total_rows DESC -LIMIT 20; -``` - -### Total row count by partition and table - -```sql -SELECT database_name, partition_key, table_name, max(row_count) as total_rows -FROM chunk_columns -GROUP BY database_name, partition_key, table_name -ORDER BY total_rows DESC -LIMIT 20; -``` - -### Time range stored per table - -This query provides an estimate, by table, of how long of a time range -and the estimated number of rows per second it holds in IOx -(the `1,000,000,000` is the conversion from nanoseconds) - -```sql -select table_name, -1000000000.0 * total_rows / range as rows_per_sec, -range / 1000000000.0 as range_sec, -total_rows -from -(select table_name, - column_name, - sum(row_count) as total_rows, - max(cast(max_value as double)) - min(cast(min_value as double)) as range - from chunk_columns - where column_name = 'time' - group by table_name, column_name -) -where range > 0 -order by range_sec desc; -``` - - # SQL Reference Since IOx uses Apache Arrow's @@ -249,14 +131,7 @@ In this section, IOx specific SQL tables, commands, and extensions are documente ## System Tables -In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model. Another process, such as the `observer` mode in the IOx SQL client, must be used for queries on information that spans databases. +In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model. -### `system.chunks` -`system.chunks` contains information about each IOx storage chunk (which holds part of the data for a table). - -TODO: document each column, once they have stabilized. - -### `system.columns` -`system.columns` contains IOx specific schema information about each column in each table, such as which columns were loaded as tags, fields, and timestamps in the InfluxDB data model. - -TODO: document each column, once they have stabilized. +### `system.queries` +`system.queries` contains information about queries run against this IOx instance diff --git a/influxdb_iox/src/commands/sql.rs b/influxdb_iox/src/commands/sql.rs index 2a532b83df..adf550b0ec 100644 --- a/influxdb_iox/src/commands/sql.rs +++ b/influxdb_iox/src/commands/sql.rs @@ -5,7 +5,6 @@ use snafu::{ResultExt, Snafu}; use influxdb_iox_client::{connection::Connection, health}; -mod observer; mod repl; mod repl_command; diff --git a/influxdb_iox/src/commands/sql/observer.rs b/influxdb_iox/src/commands/sql/observer.rs deleted file mode 100644 index 6622e19bb8..0000000000 --- a/influxdb_iox/src/commands/sql/observer.rs +++ /dev/null @@ -1,296 +0,0 @@ -//! This module implements the "Observer" functionality of the SQL repl - -use arrow::{ - array::{Array, ArrayRef, StringArray}, - datatypes::{Field, Schema}, - record_batch::RecordBatch, -}; -use datafusion::{ - datasource::MemTable, - prelude::{SessionConfig, SessionContext}, -}; -use influxdb_iox_client::{connection::Connection, flight::generated_types::ReadInfo}; -use observability_deps::tracing::{debug, info}; -use snafu::{ResultExt, Snafu}; -use std::{collections::HashMap, sync::Arc, time::Instant}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Generic error"))] - Generic, - - #[snafu(display("Error loading remote state: {}", source))] - LoadingDatabaseNames { - source: influxdb_iox_client::error::Error, - }, - - #[snafu(display("Error running remote query: {}", source))] - RunningRemoteQuery { - source: influxdb_iox_client::flight::Error, - }, - - #[snafu(display("Error running observer query: {}", source))] - Query { - source: datafusion::error::DataFusionError, - }, -} - -pub type Result = std::result::Result; - -/// The Observer contains a local DataFusion execution engine that has -/// pre-loaded with consolidated system table views. -pub struct Observer { - /// DataFusion execution context for executing queries - context: SessionContext, -} - -impl Observer { - /// Attempt to create a new observer instance, loading from the remote server - pub async fn try_new(connection: Connection) -> Result { - let mut context = - SessionContext::with_config(SessionConfig::new().with_information_schema(true)); - - load_remote_system_tables(&mut context, connection).await?; - - Ok(Self { context }) - } - - /// Runs the specified sql query locally against the preloaded context - pub async fn run_query(&mut self, sql: &str) -> Result> { - self.context - .sql(sql) - .await - .context(QuerySnafu)? - .collect() - .await - .context(QuerySnafu) - } - - pub fn help(&self) -> String { - r#"You are now in Observer mode. - -SQL commands in this mode run against a cached unified view of -remote system tables in all remote databases. - -To see the unified tables available to you, try running -SHOW TABLES; - -To reload the most recent version of the database system tables, run -OBSERVER; - -"# - .to_string() - } -} - -/// Copies the data from the remote tables across all databases in a -/// remote server into a local copy that also has an extra -/// `database_name` column for the database -async fn load_remote_system_tables( - context: &mut SessionContext, - connection: Connection, -) -> Result<()> { - // all prefixed with "system." - let table_names = vec!["queries"]; - - let start = Instant::now(); - - let mut namespace_client = influxdb_iox_client::namespace::Client::new(connection.clone()); - - let db_names: Vec<_> = namespace_client - .get_namespaces() - .await - .context(LoadingDatabaseNamesSnafu)? - .into_iter() - .map(|ns| ns.name) - .collect(); - - println!("Loading system tables from {} databases", db_names.len()); - - let tasks = db_names - .into_iter() - .flat_map(|db_name| { - let table_names = table_names.clone(); - let connection = connection.clone(); - table_names.into_iter().map(move |table_name| { - let table_name = table_name.to_string(); - let db_name = db_name.to_string(); - let connection = connection.clone(); - let sql = format!("select * from system.{}", table_name); - tokio::task::spawn(async move { - let mut client = influxdb_iox_client::flight::Client::new(connection); - let mut query_results = client - .perform_query(ReadInfo { - namespace_name: db_name.clone(), - sql_query: sql, - }) - .await - .context(RunningRemoteQuerySnafu)?; - - let mut batches = vec![]; - - while let Some(data) = query_results - .next() - .await - .context(RunningRemoteQuerySnafu)? - { - batches.push(data); - } - - let t: Result = Ok(RemoteSystemTable { - db_name, - table_name, - batches, - }); - print!("."); // give some indication of progress - use std::io::Write; - std::io::stdout().flush().unwrap(); - t - }) - }) - }) - .collect::>(); - - // now, get the results and combine them - let results = futures::future::join_all(tasks).await; - - let mut builder = AggregatedTableBuilder::new(); - results.into_iter().for_each(|result| { - match result { - Ok(Ok(table)) => { - builder.append(table); - } - // This is not a fatal error so log it and keep going - Ok(Err(e)) => { - println!("WARNING: Error running query: {}", e); - } - // This is not a fatal error so log it and keep going - Err(e) => { - println!("WARNING: Error running task: {}", e); - } - } - }); - - println!(); - println!(" Completed in {:?}", Instant::now() - start); - - builder.build(context); - - Ok(()) -} - -#[derive(Debug)] -/// Contains the results from a system table query for a specific database -struct RemoteSystemTable { - db_name: String, - table_name: String, - batches: Vec, -} - -#[derive(Debug, Default)] -/// Aggregates several table responses into a unified view -struct AggregatedTableBuilder { - tables: HashMap, -} - -impl AggregatedTableBuilder { - fn new() -> Self { - Self::default() - } - - /// Appends a table response to the aggregated tables being built - fn append(&mut self, t: RemoteSystemTable) { - let RemoteSystemTable { - db_name, - table_name, - batches, - } = t; - - let num_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - - info!(%table_name, %db_name, num_batches=batches.len(), %num_rows, "Aggregating results"); - - let table_builder = self - .tables - .entry(table_name.clone()) - .or_insert_with(|| VirtualTableBuilder::new(table_name)); - - table_builder.append_batches(&db_name, batches); - } - - /// register a table provider for this system table - fn build(self, ctx: &mut SessionContext) { - let Self { tables } = self; - - for (table_name, table_builder) in tables { - debug!(%table_name, "registering system table"); - table_builder.build(ctx); - } - } -} - -/// Creates a "virtual" version of `select * from ` which has a -/// "database_name" column pre-pended to all actual record batches -#[derive(Debug)] -struct VirtualTableBuilder { - table_name: String, - batches: Vec, -} - -impl VirtualTableBuilder { - pub fn new(table_name: impl Into) -> Self { - let table_name = table_name.into(); - Self { - table_name, - batches: Vec::new(), - } - } - - /// Append batches from `select * from ` to the - /// results being created - fn append_batches(&mut self, db_name: &str, new_batches: Vec) { - self.batches.extend(new_batches.into_iter().map(|batch| { - use std::iter::once; - - let array = - StringArray::from_iter_values(std::iter::repeat(db_name).take(batch.num_rows())); - let data_type = array.data_type().clone(); - let array = Arc::new(array) as ArrayRef; - - let new_columns = once(array) - .chain(batch.columns().iter().cloned()) - .collect::>(); - - let new_fields = once(Field::new("database_name", data_type, false)) - .chain(batch.schema().fields().iter().cloned()) - .collect::>(); - let new_schema = Arc::new(Schema::new(new_fields)); - - RecordBatch::try_new(new_schema, new_columns).expect("Creating new record batch") - })) - } - - /// register a table provider for this system table - fn build(self, ctx: &mut SessionContext) { - let Self { - table_name, - batches, - } = self; - - let schema = if batches.is_empty() { - panic!("No batches for ChunksTableBuilder"); - } else { - batches[0].schema() - }; - - let partitions = batches - .into_iter() - .map(|batch| vec![batch]) - .collect::>(); - - let memtable = MemTable::try_new(schema, partitions).expect("creating memtable"); - - ctx.register_table(table_name.as_str(), Arc::new(memtable)) - .ok(); - } -} diff --git a/influxdb_iox/src/commands/sql/repl.rs b/influxdb_iox/src/commands/sql/repl.rs index 129367b906..c1b4e6808c 100644 --- a/influxdb_iox/src/commands/sql/repl.rs +++ b/influxdb_iox/src/commands/sql/repl.rs @@ -43,9 +43,6 @@ pub enum Error { source: influxdb_iox_client::flight::Error, }, - #[snafu(display("Error running observer query: {}", source))] - RunningObserverQuery { source: super::observer::Error }, - #[snafu(display("Cannot create REPL: {}", source))] ReplCreation { source: ReadlineError }, } @@ -55,9 +52,6 @@ pub type Result = std::result::Result; enum QueryEngine { /// Run queries against the namespace on the remote server Remote(String), - - /// Run queries against a local `Observer` instance - Observer(super::observer::Observer), } struct RustylineHelper { @@ -168,9 +162,6 @@ pub struct Repl { /// Current prompt prompt: String, - /// Connection to the server - connection: Connection, - /// Client for interacting with IOx namespace API namespace_client: influxdb_iox_client::namespace::Client, @@ -192,7 +183,7 @@ impl Repl { /// Create a new Repl instance, connected to the specified URL pub fn new(connection: Connection) -> Result { let namespace_client = influxdb_iox_client::namespace::Client::new(connection.clone()); - let flight_client = influxdb_iox_client::flight::Client::new(connection.clone()); + let flight_client = influxdb_iox_client::flight::Client::new(connection); let mut rl = Editor::new().context(ReplCreationSnafu)?; rl.set_helper(Some(RustylineHelper::default())); @@ -209,7 +200,6 @@ impl Repl { Ok(Self { rl, prompt, - connection, namespace_client, flight_client, query_engine: None, @@ -227,12 +217,6 @@ impl Repl { ReplCommand::Help => { self.print_help(); } - ReplCommand::Observer {} => { - self.use_observer() - .await - .map_err(|e| println!("{}", e)) - .ok(); - } ReplCommand::ShowNamespaces => { self.list_namespaces() .await @@ -317,13 +301,6 @@ impl Repl { scrape_query(&mut self.flight_client, db_name, &sql).await? } - Some(QueryEngine::Observer(observer)) => { - info!("Running sql on local observer"); - observer - .run_query(&sql) - .await - .context(RunningObserverQuerySnafu)? - } }; let end = Instant::now(); @@ -355,22 +332,11 @@ impl Repl { self.set_query_engine(QueryEngine::Remote(db_name)); } - async fn use_observer(&mut self) -> Result<()> { - println!("Preparing local views of remote system tables"); - let observer = super::observer::Observer::try_new(self.connection.clone()) - .await - .context(RunningObserverQuerySnafu)?; - println!("{}", observer.help()); - self.set_query_engine(QueryEngine::Observer(observer)); - Ok(()) - } - fn set_query_engine(&mut self, query_engine: QueryEngine) { self.prompt = match &query_engine { QueryEngine::Remote(db_name) => { format!("{}> ", db_name) } - QueryEngine::Observer(_) => "OBSERVER> ".to_string(), }; self.query_engine = Some(query_engine) } diff --git a/influxdb_iox/src/commands/sql/repl_command.rs b/influxdb_iox/src/commands/sql/repl_command.rs index 56f310ed7f..7139dc0045 100644 --- a/influxdb_iox/src/commands/sql/repl_command.rs +++ b/influxdb_iox/src/commands/sql/repl_command.rs @@ -5,7 +5,6 @@ use observability_deps::tracing::{debug, warn}; pub enum ReplCommand { Help, ShowNamespaces, - Observer, SetFormat { format: String }, UseNamespace { db_name: String }, SqlCommand { sql: String }, @@ -61,7 +60,6 @@ impl TryFrom<&str> for ReplCommand { warn!(%extra_content, "ignoring tokens after 'help'"); Ok(Self::Help) } - ["observer"] => Ok(Self::Observer), ["exit"] => Ok(Self::Exit), ["quit"] => Ok(Self::Exit), ["use", "namespace"] => { @@ -104,8 +102,6 @@ USE NAMESPACE : Set the current remote namespace to name SET FORMAT : Set the output format to Pretty, csv or json -OBSERVER: Locally query unified queryable views of remote system tables - [EXIT | QUIT]: Quit this session and exit the program # Examples: use remote namespace foo @@ -118,20 +114,6 @@ USE foo; SHOW TABLES; ;; Show available tables SHOW COLUMNS FROM my_table; ;; Show columns in the table -;; Show storage usage across partitions and tables -SELECT - partition_key, table_name, storage, - count(*) as chunk_count, - sum(memory_bytes)/(1024*1024) as size_mb -FROM - system.chunks -GROUP BY - partition_key, table_name, storage -ORDER BY - size_mb DESC -LIMIT 20 -; - "# } } @@ -169,21 +151,6 @@ mod tests { assert_eq!(" help me; ".try_into(), expected); } - #[test] - fn observer() { - let expected = Ok(ReplCommand::Observer); - assert_eq!("observer;".try_into(), expected); - assert_eq!("observer".try_into(), expected); - assert_eq!(" observer".try_into(), expected); - assert_eq!(" observer ".try_into(), expected); - assert_eq!(" OBSERVER ".try_into(), expected); - assert_eq!(" Observer; ".try_into(), expected); - assert_eq!(" observer ; ".try_into(), expected); - - let expected = sql_cmd(" observer me; "); - assert_eq!(" observer me; ".try_into(), expected); - } - #[test] fn show_namespaces() { let expected = Ok(ReplCommand::ShowNamespaces);