refactor: remove outdated observer sql repl mode (#5918)

* refactor: remove Observer mode from repl

* chore: remove outdated SQL docs

* fix: more update of sql docs
pull/24376/head
Andrew Lamb 2022-10-19 14:39:05 -04:00 committed by GitHub
parent 755fb029f9
commit 1df7a0d4fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 5 additions and 494 deletions

View File

@ -32,7 +32,7 @@ Then run the `USE <your_namespace>` command
```
> use 810c5937734635d8_dbce66e3a6cbe757;
You are now in remote mode, querying namespace 810c5937734635d8_dbce66e3a6cbe757
810c5937734635d8_dbce66e3a6cbe757>
810c5937734635d8_dbce66e3a6cbe757>
```
Now, all queries will be run against the specified namespace (`810c5937734635d8_dbce66e3a6cbe757`) in this example
@ -67,31 +67,6 @@ Returned 1 row in 74.022768ms
```
## Observer
In this mode queries are run *locally* against a cached unified view of the remote system tables
```
810c5937734635d8_dbce66e3a6cbe757> observer
;
Preparing local views of remote system tables
Loading system tables from 3 databases
...
Completed in 112.085784ms
You are now in Observer mode.
SQL commands in this mode run against a cached unified view of
remote system tables in all remote databases.
To see the unified tables available to you, try running
SHOW TABLES;
To reload the most recent version of the database system tables, run
OBSERVER;
OBSERVER>
```
# Query Cookbook
This section contains some common and useful queries against IOx system tables
@ -108,9 +83,6 @@ my_db> show tables;
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+-------------+------------+
| public | iox | query_count | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+-------------+------------+
@ -149,96 +121,6 @@ Query execution complete in 39.046225ms
```
## System Tables
Here are some interesting reports you can run when in `OBSERVER` mode:
### Total storage size taken by each database
```sql
SELECT
database_name, count(*) as num_chunks,
sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks
GROUP BY database_name
ORDER BY estimated_mb desc
LIMIT 20;
```
### Total estimated storage size by database and storage class
```sql
SELECT
database_name, storage, count(*) as num_chunks,
sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks
GROUP BY database_name, storage
ORDER BY estimated_mb desc
LIMIT 20;
```
### Total estimated storage size by database, table_name and storage class
```sql
SELECT
database_name, table_name, storage, count(*) as num_chunks,
sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks
GROUP BY database_name, table_name, storage
ORDER BY estimated_mb desc
LIMIT 20;
```
### Total row count by table
```sql
SELECT database_name, table_name, sum(total_rows) as total_rows
FROM (
SELECT database_name, table_name, max(row_count) as total_rows
FROM chunk_columns
GROUP BY database_name, partition_key, table_name
)
GROUP BY database_name, table_name
ORDER BY total_rows DESC
LIMIT 20;
```
### Total row count by partition and table
```sql
SELECT database_name, partition_key, table_name, max(row_count) as total_rows
FROM chunk_columns
GROUP BY database_name, partition_key, table_name
ORDER BY total_rows DESC
LIMIT 20;
```
### Time range stored per table
This query provides an estimate, by table, of how long of a time range
and the estimated number of rows per second it holds in IOx
(the `1,000,000,000` is the conversion from nanoseconds)
```sql
select table_name,
1000000000.0 * total_rows / range as rows_per_sec,
range / 1000000000.0 as range_sec,
total_rows
from
(select table_name,
column_name,
sum(row_count) as total_rows,
max(cast(max_value as double)) - min(cast(min_value as double)) as range
from chunk_columns
where column_name = 'time'
group by table_name, column_name
)
where range > 0
order by range_sec desc;
```
# SQL Reference
Since IOx uses Apache Arrow's
@ -249,14 +131,7 @@ In this section, IOx specific SQL tables, commands, and extensions are documente
## System Tables
In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model. Another process, such as the `observer` mode in the IOx SQL client, must be used for queries on information that spans databases.
In addition to the SQL standard `information_schema`, IOx contains several *system tables* that provide access to IOx specific information. The information in each system table is scoped to that particular database. Cross database queries are not possible due to the design of IOx's security model.
### `system.chunks`
`system.chunks` contains information about each IOx storage chunk (which holds part of the data for a table).
TODO: document each column, once they have stabilized.
### `system.columns`
`system.columns` contains IOx specific schema information about each column in each table, such as which columns were loaded as tags, fields, and timestamps in the InfluxDB data model.
TODO: document each column, once they have stabilized.
### `system.queries`
`system.queries` contains information about queries run against this IOx instance

View File

@ -5,7 +5,6 @@ use snafu::{ResultExt, Snafu};
use influxdb_iox_client::{connection::Connection, health};
mod observer;
mod repl;
mod repl_command;

View File

@ -1,296 +0,0 @@
//! This module implements the "Observer" functionality of the SQL repl
use arrow::{
array::{Array, ArrayRef, StringArray},
datatypes::{Field, Schema},
record_batch::RecordBatch,
};
use datafusion::{
datasource::MemTable,
prelude::{SessionConfig, SessionContext},
};
use influxdb_iox_client::{connection::Connection, flight::generated_types::ReadInfo};
use observability_deps::tracing::{debug, info};
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, sync::Arc, time::Instant};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Generic error"))]
Generic,
#[snafu(display("Error loading remote state: {}", source))]
LoadingDatabaseNames {
source: influxdb_iox_client::error::Error,
},
#[snafu(display("Error running remote query: {}", source))]
RunningRemoteQuery {
source: influxdb_iox_client::flight::Error,
},
#[snafu(display("Error running observer query: {}", source))]
Query {
source: datafusion::error::DataFusionError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The Observer contains a local DataFusion execution engine that has
/// pre-loaded with consolidated system table views.
pub struct Observer {
/// DataFusion execution context for executing queries
context: SessionContext,
}
impl Observer {
/// Attempt to create a new observer instance, loading from the remote server
pub async fn try_new(connection: Connection) -> Result<Self> {
let mut context =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
load_remote_system_tables(&mut context, connection).await?;
Ok(Self { context })
}
/// Runs the specified sql query locally against the preloaded context
pub async fn run_query(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
self.context
.sql(sql)
.await
.context(QuerySnafu)?
.collect()
.await
.context(QuerySnafu)
}
pub fn help(&self) -> String {
r#"You are now in Observer mode.
SQL commands in this mode run against a cached unified view of
remote system tables in all remote databases.
To see the unified tables available to you, try running
SHOW TABLES;
To reload the most recent version of the database system tables, run
OBSERVER;
"#
.to_string()
}
}
/// Copies the data from the remote tables across all databases in a
/// remote server into a local copy that also has an extra
/// `database_name` column for the database
async fn load_remote_system_tables(
context: &mut SessionContext,
connection: Connection,
) -> Result<()> {
// all prefixed with "system."
let table_names = vec!["queries"];
let start = Instant::now();
let mut namespace_client = influxdb_iox_client::namespace::Client::new(connection.clone());
let db_names: Vec<_> = namespace_client
.get_namespaces()
.await
.context(LoadingDatabaseNamesSnafu)?
.into_iter()
.map(|ns| ns.name)
.collect();
println!("Loading system tables from {} databases", db_names.len());
let tasks = db_names
.into_iter()
.flat_map(|db_name| {
let table_names = table_names.clone();
let connection = connection.clone();
table_names.into_iter().map(move |table_name| {
let table_name = table_name.to_string();
let db_name = db_name.to_string();
let connection = connection.clone();
let sql = format!("select * from system.{}", table_name);
tokio::task::spawn(async move {
let mut client = influxdb_iox_client::flight::Client::new(connection);
let mut query_results = client
.perform_query(ReadInfo {
namespace_name: db_name.clone(),
sql_query: sql,
})
.await
.context(RunningRemoteQuerySnafu)?;
let mut batches = vec![];
while let Some(data) = query_results
.next()
.await
.context(RunningRemoteQuerySnafu)?
{
batches.push(data);
}
let t: Result<RemoteSystemTable> = Ok(RemoteSystemTable {
db_name,
table_name,
batches,
});
print!("."); // give some indication of progress
use std::io::Write;
std::io::stdout().flush().unwrap();
t
})
})
})
.collect::<Vec<_>>();
// now, get the results and combine them
let results = futures::future::join_all(tasks).await;
let mut builder = AggregatedTableBuilder::new();
results.into_iter().for_each(|result| {
match result {
Ok(Ok(table)) => {
builder.append(table);
}
// This is not a fatal error so log it and keep going
Ok(Err(e)) => {
println!("WARNING: Error running query: {}", e);
}
// This is not a fatal error so log it and keep going
Err(e) => {
println!("WARNING: Error running task: {}", e);
}
}
});
println!();
println!(" Completed in {:?}", Instant::now() - start);
builder.build(context);
Ok(())
}
#[derive(Debug)]
/// Contains the results from a system table query for a specific database
struct RemoteSystemTable {
db_name: String,
table_name: String,
batches: Vec<RecordBatch>,
}
#[derive(Debug, Default)]
/// Aggregates several table responses into a unified view
struct AggregatedTableBuilder {
tables: HashMap<String, VirtualTableBuilder>,
}
impl AggregatedTableBuilder {
fn new() -> Self {
Self::default()
}
/// Appends a table response to the aggregated tables being built
fn append(&mut self, t: RemoteSystemTable) {
let RemoteSystemTable {
db_name,
table_name,
batches,
} = t;
let num_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
info!(%table_name, %db_name, num_batches=batches.len(), %num_rows, "Aggregating results");
let table_builder = self
.tables
.entry(table_name.clone())
.or_insert_with(|| VirtualTableBuilder::new(table_name));
table_builder.append_batches(&db_name, batches);
}
/// register a table provider for this system table
fn build(self, ctx: &mut SessionContext) {
let Self { tables } = self;
for (table_name, table_builder) in tables {
debug!(%table_name, "registering system table");
table_builder.build(ctx);
}
}
}
/// Creates a "virtual" version of `select * from <table>` which has a
/// "database_name" column pre-pended to all actual record batches
#[derive(Debug)]
struct VirtualTableBuilder {
table_name: String,
batches: Vec<RecordBatch>,
}
impl VirtualTableBuilder {
pub fn new(table_name: impl Into<String>) -> Self {
let table_name = table_name.into();
Self {
table_name,
batches: Vec::new(),
}
}
/// Append batches from `select * from <system table>` to the
/// results being created
fn append_batches(&mut self, db_name: &str, new_batches: Vec<RecordBatch>) {
self.batches.extend(new_batches.into_iter().map(|batch| {
use std::iter::once;
let array =
StringArray::from_iter_values(std::iter::repeat(db_name).take(batch.num_rows()));
let data_type = array.data_type().clone();
let array = Arc::new(array) as ArrayRef;
let new_columns = once(array)
.chain(batch.columns().iter().cloned())
.collect::<Vec<ArrayRef>>();
let new_fields = once(Field::new("database_name", data_type, false))
.chain(batch.schema().fields().iter().cloned())
.collect::<Vec<Field>>();
let new_schema = Arc::new(Schema::new(new_fields));
RecordBatch::try_new(new_schema, new_columns).expect("Creating new record batch")
}))
}
/// register a table provider for this system table
fn build(self, ctx: &mut SessionContext) {
let Self {
table_name,
batches,
} = self;
let schema = if batches.is_empty() {
panic!("No batches for ChunksTableBuilder");
} else {
batches[0].schema()
};
let partitions = batches
.into_iter()
.map(|batch| vec![batch])
.collect::<Vec<_>>();
let memtable = MemTable::try_new(schema, partitions).expect("creating memtable");
ctx.register_table(table_name.as_str(), Arc::new(memtable))
.ok();
}
}

View File

@ -43,9 +43,6 @@ pub enum Error {
source: influxdb_iox_client::flight::Error,
},
#[snafu(display("Error running observer query: {}", source))]
RunningObserverQuery { source: super::observer::Error },
#[snafu(display("Cannot create REPL: {}", source))]
ReplCreation { source: ReadlineError },
}
@ -55,9 +52,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
enum QueryEngine {
/// Run queries against the namespace on the remote server
Remote(String),
/// Run queries against a local `Observer` instance
Observer(super::observer::Observer),
}
struct RustylineHelper {
@ -168,9 +162,6 @@ pub struct Repl {
/// Current prompt
prompt: String,
/// Connection to the server
connection: Connection,
/// Client for interacting with IOx namespace API
namespace_client: influxdb_iox_client::namespace::Client,
@ -192,7 +183,7 @@ impl Repl {
/// Create a new Repl instance, connected to the specified URL
pub fn new(connection: Connection) -> Result<Self> {
let namespace_client = influxdb_iox_client::namespace::Client::new(connection.clone());
let flight_client = influxdb_iox_client::flight::Client::new(connection.clone());
let flight_client = influxdb_iox_client::flight::Client::new(connection);
let mut rl = Editor::new().context(ReplCreationSnafu)?;
rl.set_helper(Some(RustylineHelper::default()));
@ -209,7 +200,6 @@ impl Repl {
Ok(Self {
rl,
prompt,
connection,
namespace_client,
flight_client,
query_engine: None,
@ -227,12 +217,6 @@ impl Repl {
ReplCommand::Help => {
self.print_help();
}
ReplCommand::Observer {} => {
self.use_observer()
.await
.map_err(|e| println!("{}", e))
.ok();
}
ReplCommand::ShowNamespaces => {
self.list_namespaces()
.await
@ -317,13 +301,6 @@ impl Repl {
scrape_query(&mut self.flight_client, db_name, &sql).await?
}
Some(QueryEngine::Observer(observer)) => {
info!("Running sql on local observer");
observer
.run_query(&sql)
.await
.context(RunningObserverQuerySnafu)?
}
};
let end = Instant::now();
@ -355,22 +332,11 @@ impl Repl {
self.set_query_engine(QueryEngine::Remote(db_name));
}
async fn use_observer(&mut self) -> Result<()> {
println!("Preparing local views of remote system tables");
let observer = super::observer::Observer::try_new(self.connection.clone())
.await
.context(RunningObserverQuerySnafu)?;
println!("{}", observer.help());
self.set_query_engine(QueryEngine::Observer(observer));
Ok(())
}
fn set_query_engine(&mut self, query_engine: QueryEngine) {
self.prompt = match &query_engine {
QueryEngine::Remote(db_name) => {
format!("{}> ", db_name)
}
QueryEngine::Observer(_) => "OBSERVER> ".to_string(),
};
self.query_engine = Some(query_engine)
}

View File

@ -5,7 +5,6 @@ use observability_deps::tracing::{debug, warn};
pub enum ReplCommand {
Help,
ShowNamespaces,
Observer,
SetFormat { format: String },
UseNamespace { db_name: String },
SqlCommand { sql: String },
@ -61,7 +60,6 @@ impl TryFrom<&str> for ReplCommand {
warn!(%extra_content, "ignoring tokens after 'help'");
Ok(Self::Help)
}
["observer"] => Ok(Self::Observer),
["exit"] => Ok(Self::Exit),
["quit"] => Ok(Self::Exit),
["use", "namespace"] => {
@ -104,8 +102,6 @@ USE NAMESPACE <name>: Set the current remote namespace to name
SET FORMAT <format>: Set the output format to Pretty, csv or json
OBSERVER: Locally query unified queryable views of remote system tables
[EXIT | QUIT]: Quit this session and exit the program
# Examples: use remote namespace foo
@ -118,20 +114,6 @@ USE foo;
SHOW TABLES; ;; Show available tables
SHOW COLUMNS FROM my_table; ;; Show columns in the table
;; Show storage usage across partitions and tables
SELECT
partition_key, table_name, storage,
count(*) as chunk_count,
sum(memory_bytes)/(1024*1024) as size_mb
FROM
system.chunks
GROUP BY
partition_key, table_name, storage
ORDER BY
size_mb DESC
LIMIT 20
;
"#
}
}
@ -169,21 +151,6 @@ mod tests {
assert_eq!(" help me; ".try_into(), expected);
}
#[test]
fn observer() {
let expected = Ok(ReplCommand::Observer);
assert_eq!("observer;".try_into(), expected);
assert_eq!("observer".try_into(), expected);
assert_eq!(" observer".try_into(), expected);
assert_eq!(" observer ".try_into(), expected);
assert_eq!(" OBSERVER ".try_into(), expected);
assert_eq!(" Observer; ".try_into(), expected);
assert_eq!(" observer ; ".try_into(), expected);
let expected = sql_cmd(" observer me; ");
assert_eq!(" observer me; ".try_into(), expected);
}
#[test]
fn show_namespaces() {
let expected = Ok(ReplCommand::ShowNamespaces);