Merge branch 'main' into cn/list-soft-deleted
commit
2d41fd519f
|
@ -93,7 +93,7 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
|
|||
|
||||
/// Return the amount of memory in bytes taken up by this array
|
||||
pub fn size(&self) -> usize {
|
||||
self.storage.len() + self.offsets.len() * std::mem::size_of::<K>()
|
||||
self.storage.capacity() + self.offsets.capacity() * std::mem::size_of::<K>()
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> (Vec<K>, String) {
|
||||
|
|
|
@ -67,7 +67,7 @@ instance to write to Kafka and the database in the "reader" IOx instance to read
|
|||
you run it with:
|
||||
|
||||
```
|
||||
cargo run -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates
|
||||
cargo run --release -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates
|
||||
```
|
||||
|
||||
This script adds 3 rows to a `writer_test` table because [this issue with the Kafka Consumer
|
||||
|
@ -81,7 +81,7 @@ from an existing schema as a guide. In this example, we're going to use
|
|||
Next, run the data generation tool as follows:
|
||||
|
||||
```
|
||||
cargo run -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates
|
||||
cargo run --release -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates
|
||||
```
|
||||
|
||||
- `--spec iox_data_generator/schemas/cap-write.toml` sets the schema you want to use to generate the data
|
||||
|
@ -100,7 +100,7 @@ is,
|
|||
|
||||
```
|
||||
# in your influxdb_iox checkout
|
||||
cargo run -- sql -h http://127.0.0.1:8086
|
||||
cargo run --release -- sql -h http://127.0.0.1:8086
|
||||
```
|
||||
|
||||
Connecting to the writer instance won't show any data.
|
||||
|
|
|
@ -126,23 +126,23 @@ impl MBChunk {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a queryable snapshot of this chunk
|
||||
/// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached.
|
||||
#[cfg(not(feature = "nocache"))]
|
||||
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
|
||||
pub fn snapshot(&self) -> (Arc<ChunkSnapshot>, bool) {
|
||||
let mut guard = self.snapshot.lock();
|
||||
if let Some(snapshot) = &*guard {
|
||||
return Arc::clone(snapshot);
|
||||
return (Arc::clone(snapshot), false);
|
||||
}
|
||||
|
||||
let snapshot = Arc::new(ChunkSnapshot::new(self));
|
||||
*guard = Some(Arc::clone(&snapshot));
|
||||
snapshot
|
||||
(snapshot, true)
|
||||
}
|
||||
|
||||
/// Returns a queryable snapshot of this chunk
|
||||
/// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached.
|
||||
#[cfg(feature = "nocache")]
|
||||
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
|
||||
Arc::new(ChunkSnapshot::new(self))
|
||||
pub fn snapshot(&self) -> (Arc<ChunkSnapshot>, bool) {
|
||||
(Arc::new(ChunkSnapshot::new(self)), false)
|
||||
}
|
||||
|
||||
/// Return the name of the table in this chunk
|
||||
|
@ -227,14 +227,26 @@ impl MBChunk {
|
|||
/// Return the approximate memory size of the chunk, in bytes including the
|
||||
/// dictionary, tables, and their rows.
|
||||
///
|
||||
/// This includes the size of `self`.
|
||||
///
|
||||
/// Note: This does not include the size of any cached ChunkSnapshot
|
||||
pub fn size(&self) -> usize {
|
||||
// TODO: Better accounting of non-column data (#1565)
|
||||
self.columns
|
||||
let size_self = std::mem::size_of::<Self>();
|
||||
|
||||
let size_columns = self
|
||||
.columns
|
||||
.iter()
|
||||
.map(|(k, v)| k.len() + v.size())
|
||||
.sum::<usize>()
|
||||
+ self.table_name.len()
|
||||
.map(|(k, v)| k.capacity() + v.size())
|
||||
.sum::<usize>();
|
||||
|
||||
let size_table_name = self.table_name.len();
|
||||
|
||||
let snapshot_size = {
|
||||
let guard = self.snapshot.lock();
|
||||
guard.as_ref().map(|snapshot| snapshot.size()).unwrap_or(0)
|
||||
};
|
||||
|
||||
size_self + size_columns + size_table_name + snapshot_size
|
||||
}
|
||||
|
||||
/// Returns an iterator over (column_name, estimated_size) for all
|
||||
|
@ -814,12 +826,16 @@ mod tests {
|
|||
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
|
||||
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
|
||||
let s1 = chunk.snapshot();
|
||||
let s2 = chunk.snapshot();
|
||||
let (s1, c1) = chunk.snapshot();
|
||||
assert!(c1);
|
||||
let (s2, c2) = chunk.snapshot();
|
||||
assert!(!c2);
|
||||
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
let s3 = chunk.snapshot();
|
||||
let s4 = chunk.snapshot();
|
||||
let (s3, c3) = chunk.snapshot();
|
||||
assert!(c3);
|
||||
let (s4, c4) = chunk.snapshot();
|
||||
assert!(!c4);
|
||||
|
||||
assert_eq!(Arc::as_ptr(&s1), Arc::as_ptr(&s2));
|
||||
assert_ne!(Arc::as_ptr(&s1), Arc::as_ptr(&s3));
|
||||
|
@ -846,8 +862,12 @@ mod tests {
|
|||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
let s3 = chunk.size();
|
||||
|
||||
// Should increase by a constant amount each time
|
||||
assert_eq!(s2 - s1, s3 - s2);
|
||||
// Should increase or stay identical (if array capacities are sufficient) each time
|
||||
assert!(s2 >= s1);
|
||||
assert!(s3 >= s2);
|
||||
|
||||
// also assume that we wrote enough data to bump the capacity at least once
|
||||
assert!(s3 > s1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -319,24 +319,29 @@ impl Column {
|
|||
}
|
||||
}
|
||||
|
||||
/// The approximate memory size of the data in the column. Note that
|
||||
/// the space taken for the tag string values is represented in
|
||||
/// the dictionary size in the chunk that holds the table that has this
|
||||
/// column. The size returned here is only for their identifiers.
|
||||
/// The approximate memory size of the data in the column.
|
||||
///
|
||||
/// This includes the size of `self`.
|
||||
pub fn size(&self) -> usize {
|
||||
let data_size = match &self.data {
|
||||
ColumnData::F64(v, stats) => mem::size_of::<f64>() * v.len() + mem::size_of_val(&stats),
|
||||
ColumnData::I64(v, stats) => mem::size_of::<i64>() * v.len() + mem::size_of_val(&stats),
|
||||
ColumnData::U64(v, stats) => mem::size_of::<u64>() * v.len() + mem::size_of_val(&stats),
|
||||
ColumnData::F64(v, stats) => {
|
||||
mem::size_of::<f64>() * v.capacity() + mem::size_of_val(&stats)
|
||||
}
|
||||
ColumnData::I64(v, stats) => {
|
||||
mem::size_of::<i64>() * v.capacity() + mem::size_of_val(&stats)
|
||||
}
|
||||
ColumnData::U64(v, stats) => {
|
||||
mem::size_of::<u64>() * v.capacity() + mem::size_of_val(&stats)
|
||||
}
|
||||
ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats),
|
||||
ColumnData::Tag(v, dictionary, stats) => {
|
||||
mem::size_of::<DID>() * v.len() + dictionary.size() + mem::size_of_val(&stats)
|
||||
mem::size_of::<DID>() * v.capacity() + dictionary.size() + mem::size_of_val(&stats)
|
||||
}
|
||||
ColumnData::String(v, stats) => {
|
||||
v.size() + mem::size_of_val(&stats) + stats.string_size()
|
||||
}
|
||||
};
|
||||
data_size + self.valid.byte_len()
|
||||
mem::size_of::<Self>() + data_size + self.valid.byte_len()
|
||||
}
|
||||
|
||||
pub fn to_arrow(&self) -> Result<ArrayRef> {
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
logs/
|
||||
volumes/
|
|
@ -0,0 +1,134 @@
|
|||
# Performance Tests
|
||||
|
||||
This tool starts a complete test environment:
|
||||
|
||||
- Kafka (docker)
|
||||
- Minio (docker)
|
||||
- Jaeger (docker)
|
||||
- IOx router (local process)
|
||||
- IOx writer (local process)
|
||||
- test battery:
|
||||
- generate data with iox_data_generator
|
||||
- query data and benchmark
|
||||
|
||||
Logs live in `perf/logs`.
|
||||
As long as perf.py is running, this works: `tail -f logs/iox_router.log`
|
||||
After perf.py exits, log files are closed.
|
||||
When perf.py is run again, old log files are deleted.
|
||||
|
||||
Persistence volumes live in `perf/volumes`.
|
||||
Similar to log files, these data remain after perf.py exits, and are deleted when perf.py is run again.
|
||||
|
||||
## Test Batteries
|
||||
|
||||
A test battery is composed of:
|
||||
- a directory, named for the battery name
|
||||
- data generator spec, in file `datagen.toml`
|
||||
- query tests, in file `queries.toml`
|
||||
- SQL query
|
||||
- (optional) name of query test
|
||||
- (optional) expected results, as a string or as a file
|
||||
|
||||
The data generator spec format is that defined by `iox_data_generator`.
|
||||
[Read about that here](../iox_data_generator/README.md).
|
||||
|
||||
The query tests file looks like this:
|
||||
```toml
|
||||
[[queries]]
|
||||
name = "example query, no expected result"
|
||||
sql = "select count(*) from cpu"
|
||||
|
||||
[[queries]]
|
||||
name = "example query, expected result in string"
|
||||
sql = "select count(*) from cpu"
|
||||
expect = """
|
||||
COUNT(Uint8(1))
|
||||
3
|
||||
"""
|
||||
|
||||
[[queries]]
|
||||
name = "example query, expected result in file"
|
||||
sql = "select count(*) from cpu"
|
||||
expect_filename = "foo.csv"
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
Help:
|
||||
```console
|
||||
perf/perf.py --help
|
||||
```
|
||||
|
||||
Run all test batteries:
|
||||
```console
|
||||
perf/perf.py all
|
||||
```
|
||||
|
||||
Run test batteries `battery-0` and `battery-1`:
|
||||
```console
|
||||
perf/perf.py battery-0 battery-1
|
||||
```
|
||||
|
||||
Keep all processes running after test batteries have completed:
|
||||
```console
|
||||
perf/perf.py battery-0 --hold
|
||||
```
|
||||
|
||||
Do not run any tests, just create an empty playground environment:
|
||||
```console
|
||||
perf/perf.py --hold
|
||||
```
|
||||
|
||||
Do not build IOx:
|
||||
```console
|
||||
perf/perf.py --skip-build
|
||||
```
|
||||
|
||||
Use debug binaries (`target/debug`) rather than release binaries:
|
||||
```console
|
||||
perf/perf.py --debug
|
||||
```
|
||||
|
||||
Use Kafka/Zookeeper instead of Redpanda:
|
||||
```console
|
||||
perf/perf.py --kafka-zookeeper
|
||||
```
|
||||
|
||||
Use in-memory object store implementation, instead of S3/Minio:
|
||||
```console
|
||||
perf/perf.py --object-store memory
|
||||
```
|
||||
|
||||
Use file object store implementation, instead of S3/Minio:
|
||||
```console
|
||||
perf/perf.py --object-store file
|
||||
```
|
||||
|
||||
Just delete docker containers and network, then exit.
|
||||
In the future, this will also detect orphaned IOx processes generated by perf, and delete those too:
|
||||
```console
|
||||
perf/perf.py --cleanup
|
||||
```
|
||||
|
||||
## Install
|
||||
|
||||
Install Docker:
|
||||
https://www.docker.com/products/docker-desktop
|
||||
|
||||
Install python3:
|
||||
|
||||
```console
|
||||
brew install python3
|
||||
```
|
||||
|
||||
or:
|
||||
|
||||
```console
|
||||
apt install python3 python3-pip
|
||||
```
|
||||
|
||||
Install the required Python packages:
|
||||
|
||||
```console
|
||||
python3 -m pip install -r perf/requirements.txt
|
||||
```
|
|
@ -0,0 +1,18 @@
|
|||
name = "example"
|
||||
|
||||
[[agents]]
|
||||
name = "cap_write_{{agent_id}}"
|
||||
count = 3
|
||||
sampling_interval = "10s"
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
|
||||
[[agents.measurements.tags]]
|
||||
name = "host"
|
||||
value = "host-{{agent_id}}"
|
||||
|
||||
[[agents.measurements.fields]]
|
||||
name = "usage_user"
|
||||
f64_range = [0.0, 100.0]
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
COUNT(Uint8(1))
|
||||
3
|
|
|
@ -0,0 +1,16 @@
|
|||
[[queries]]
|
||||
name = "example query, no expected result"
|
||||
sql = "select count(*) from cpu"
|
||||
|
||||
[[queries]]
|
||||
name = "example query, expected result in string"
|
||||
sql = "select count(*) from cpu"
|
||||
expect = """
|
||||
COUNT(Uint8(1))
|
||||
3
|
||||
"""
|
||||
|
||||
[[queries]]
|
||||
name = "example query, expected result in file"
|
||||
sql = "select count(*) from cpu"
|
||||
expect_filename = "foo.csv"
|
|
@ -0,0 +1,633 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
|
||||
import docker
|
||||
import grpc_requests
|
||||
import minio
|
||||
import requests
|
||||
import toml
|
||||
import urllib3
|
||||
|
||||
ioxperf_name = "ioxperf"
|
||||
ioxperf_labels = {ioxperf_name: None}
|
||||
ioxperf_filters = {'label': ioxperf_name}
|
||||
org_name = 'myorg'
|
||||
bucket_name = 'mybucket'
|
||||
db_name = '%s_%s' % (org_name, bucket_name)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--skip-build', help='do not build IOx, execute existing binaries', action='store_true')
|
||||
parser.add_argument('--debug', help='build/execute debug IOx binaries instead of release', action='store_true')
|
||||
parser.add_argument('--object-store', help='object store type', default='s3', choices=('memory', 's3', 'file'))
|
||||
parser.add_argument('--kafka-zookeeper', help='use Kafka/ZooKeeper instead of Redpanda', action='store_true')
|
||||
parser.add_argument('--hold', help='keep all services running after tests complete', action='store_true')
|
||||
parser.add_argument('--cleanup', help='remove Docker assets and exit (TODO terminate IOx processes)', action='store_true')
|
||||
parser.add_argument('batteries', help='name of directories containing test batteries, or "all"', nargs='*')
|
||||
args = parser.parse_args()
|
||||
|
||||
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
dc = docker.from_env()
|
||||
if args.cleanup:
|
||||
docker_cleanup_resources(dc)
|
||||
return
|
||||
cleanup_logs_and_volumes(dc)
|
||||
|
||||
batteries = args.batteries
|
||||
if batteries == ['all']:
|
||||
batteries = (
|
||||
p.relative_to(os.getcwd())
|
||||
for p
|
||||
in pathlib.Path(os.getcwd()).iterdir()
|
||||
if p.joinpath('datagen.toml').is_file()
|
||||
)
|
||||
else:
|
||||
for battery in batteries:
|
||||
p = pathlib.Path(os.getcwd()).joinpath(battery, 'datagen.toml')
|
||||
if not p.is_file():
|
||||
print('invalid battery "%s" - does not contain datagen.toml' % battery)
|
||||
exit(1)
|
||||
|
||||
processes = {}
|
||||
|
||||
try:
|
||||
if not args.skip_build:
|
||||
cargo_build_iox(args.debug)
|
||||
|
||||
docker_create_network(dc)
|
||||
if args.kafka_zookeeper:
|
||||
docker_run_zookeeper(dc)
|
||||
docker_run_kafka(dc)
|
||||
else:
|
||||
docker_run_redpanda(dc)
|
||||
if args.object_store == 's3':
|
||||
docker_run_minio(dc)
|
||||
docker_run_jaeger(dc)
|
||||
processes['iox_router'] = exec_iox(1, 'iox_router', debug=args.debug, object_store=args.object_store)
|
||||
processes['iox_writer'] = exec_iox(2, 'iox_writer', debug=args.debug, object_store=args.object_store)
|
||||
grpc_create_database(1, 2)
|
||||
|
||||
print('-' * 40)
|
||||
for battery in batteries:
|
||||
run_test_battery(battery, 1, 2, debug=args.debug)
|
||||
print('-' * 40)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
if args.hold:
|
||||
print('subprocesses are still running, ctrl-C to terminate and exit')
|
||||
try:
|
||||
signal.pause()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
print('-' * 40)
|
||||
|
||||
for service_name, process in processes.items():
|
||||
if process is None:
|
||||
continue
|
||||
print('%s <- SIGTERM' % service_name)
|
||||
process.send_signal(signal.SIGTERM)
|
||||
exit_code = process.wait(1.0)
|
||||
if exit_code is None:
|
||||
print('%s <- SIGKILL' % service_name)
|
||||
process.send_signal(signal.SIGKILL)
|
||||
if exit_code != 0:
|
||||
print('%s exited with %d' % (service_name, exit_code))
|
||||
docker_cleanup_resources(dc)
|
||||
|
||||
|
||||
def docker_cleanup_resources(dc):
|
||||
containers = dc.containers.list(all=True, filters=ioxperf_filters)
|
||||
if len(containers) > 0:
|
||||
print('removing containers: %s' % ', '.join((c.name for c in containers)))
|
||||
for container in containers:
|
||||
container.remove(v=True, force=True)
|
||||
|
||||
networks = dc.networks.list(filters=ioxperf_filters)
|
||||
if len(networks) > 0:
|
||||
print('removing networks: %s' % ', '.join((n.name for n in networks)))
|
||||
for network in networks:
|
||||
network.remove()
|
||||
|
||||
|
||||
def cleanup_logs_and_volumes(dc):
|
||||
docker_cleanup_resources(dc)
|
||||
|
||||
volume_paths = glob.glob(os.path.join(os.getcwd(), 'volumes', '*'))
|
||||
if len(volume_paths) > 0:
|
||||
print('removing volume contents: %s' % ', '.join((os.path.relpath(p) for p in volume_paths)))
|
||||
for path in volume_paths:
|
||||
shutil.rmtree(path)
|
||||
|
||||
log_paths = glob.glob(os.path.join(os.getcwd(), 'logs', '*'))
|
||||
if len(log_paths) > 0:
|
||||
print('removing logs: %s' % ', '.join((os.path.relpath(p) for p in log_paths)))
|
||||
for path in log_paths:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def docker_create_network(dc):
|
||||
dc.networks.create(name=ioxperf_name, driver='bridge', check_duplicate=True, scope='local',
|
||||
labels=ioxperf_labels)
|
||||
|
||||
|
||||
def docker_pull_image_if_needed(dc, image):
|
||||
try:
|
||||
dc.images.get(image)
|
||||
except docker.errors.ImageNotFound:
|
||||
print("pulling image '%s'..." % image)
|
||||
dc.images.pull(image)
|
||||
|
||||
|
||||
def docker_wait_container_running(container):
|
||||
while True:
|
||||
container.reload()
|
||||
if container.status == 'running':
|
||||
print("container '%s' has started" % container.name)
|
||||
return
|
||||
elif container.status == 'created':
|
||||
print("waiting for container '%s' to start" % container.name)
|
||||
time.sleep(0.1)
|
||||
raise Exception("container '%s' status '%s' unexpected" % (container.name, container.status))
|
||||
|
||||
|
||||
def pipe_container_logs_to_file(container, log_filename):
|
||||
with pathlib.Path(os.path.join(os.getcwd(), 'logs')) as dir_path:
|
||||
if not dir_path.exists():
|
||||
os.mkdir(dir_path, mode=0o777)
|
||||
|
||||
logs = container.logs(stdout=True, stderr=True, stream=True, follow=True)
|
||||
f = open(file=os.path.join(os.getcwd(), 'logs', log_filename), mode='wb', buffering=0)
|
||||
|
||||
def thread_function():
|
||||
for entry in logs:
|
||||
f.write(entry)
|
||||
f.flush()
|
||||
f.close()
|
||||
|
||||
threading.Thread(target=thread_function, daemon=True).start()
|
||||
|
||||
|
||||
def check_port_open(addr, port):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
port_open = sock.connect_ex((addr, port)) == 0
|
||||
sock.close()
|
||||
return port_open
|
||||
|
||||
|
||||
def docker_run_redpanda(dc):
|
||||
image = 'vectorized/redpanda:v21.7.6'
|
||||
command = ['redpanda', 'start',
|
||||
'--overprovisioned', '--smp 1', '--memory 128M', '--reserve-memory', '0M', '--node-id', '0',
|
||||
'--check=false', '--kafka-addr', 'CLIENT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093',
|
||||
'--advertise-kafka-addr', 'CLIENT://kafka:9092,EXTERNAL://localhost:9093']
|
||||
name = '%s-%s' % (ioxperf_name, 'redpanda')
|
||||
ports = {'9093/tcp': 9093}
|
||||
volumes = {os.path.join(os.getcwd(), 'volumes/redpanda'): {
|
||||
'bind': '/var/lib/redpanda/data',
|
||||
'mode': 'rw',
|
||||
}}
|
||||
docker_pull_image_if_needed(dc, image)
|
||||
container = dc.containers.run(image=image, command=command, detach=True, name=name, hostname='kafka',
|
||||
labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes)
|
||||
docker_wait_container_running(container)
|
||||
|
||||
while True:
|
||||
if check_port_open('127.0.0.1', 9093):
|
||||
break
|
||||
print('waiting for Redpanda to become ready')
|
||||
time.sleep(0.1)
|
||||
|
||||
pipe_container_logs_to_file(container, 'redpanda.log')
|
||||
print('Redpanda service is ready')
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def docker_run_zookeeper(dc):
|
||||
image = 'docker.io/bitnami/zookeeper:3'
|
||||
name = '%s-%s' % (ioxperf_name, 'zookeeper')
|
||||
ports = {'2181/tcp': 2181}
|
||||
env = {
|
||||
'ALLOW_ANONYMOUS_LOGIN': 'yes',
|
||||
}
|
||||
volumes = {os.path.join(os.getcwd(), 'volumes/zookeeper'): {
|
||||
'bind': '/bitnami/zookeeper',
|
||||
'mode': 'rw',
|
||||
}}
|
||||
docker_pull_image_if_needed(dc, image)
|
||||
container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='zookeeper',
|
||||
labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes)
|
||||
docker_wait_container_running(container)
|
||||
|
||||
while True:
|
||||
if check_port_open('127.0.0.1', 2181):
|
||||
break
|
||||
print('waiting for ZooKeeper to become ready')
|
||||
time.sleep(0.1)
|
||||
|
||||
pipe_container_logs_to_file(container, 'zookeeper.log')
|
||||
print('ZooKeeper service is ready')
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def docker_run_kafka(dc):
|
||||
image = 'docker.io/bitnami/kafka:2'
|
||||
name = '%s-%s' % (ioxperf_name, 'kafka')
|
||||
ports = {'9093/tcp': 9093}
|
||||
env = {
|
||||
'KAFKA_CFG_ZOOKEEPER_CONNECT': 'zookeeper:2181',
|
||||
'ALLOW_PLAINTEXT_LISTENER': 'yes',
|
||||
'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP': 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT',
|
||||
'KAFKA_CFG_LISTENERS': 'CLIENT://:9092,EXTERNAL://:9093',
|
||||
'KAFKA_CFG_ADVERTISED_LISTENERS': 'CLIENT://kafka:9092,EXTERNAL://localhost:9093',
|
||||
'KAFKA_INTER_BROKER_LISTENER_NAME': 'CLIENT',
|
||||
'KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS': '100',
|
||||
}
|
||||
volumes = {os.path.join(os.getcwd(), 'volumes/kafka'): {
|
||||
'bind': '/bitname/kafka',
|
||||
'mode': 'rw',
|
||||
}}
|
||||
docker_pull_image_if_needed(dc, image)
|
||||
container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='kafka',
|
||||
labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes)
|
||||
docker_wait_container_running(container)
|
||||
|
||||
while True:
|
||||
if check_port_open('127.0.0.1', 9093):
|
||||
break
|
||||
print('waiting for Kafka to become ready')
|
||||
time.sleep(0.1)
|
||||
|
||||
pipe_container_logs_to_file(container, 'kafka.log')
|
||||
print('Kafka service is ready')
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def docker_run_minio(dc):
|
||||
image = 'minio/minio:RELEASE.2021-08-05T22-01-19Z'
|
||||
command = 'server --address 0.0.0.0:9000 --console-address 0.0.0.0:9001 /data'
|
||||
name = '%s-%s' % (ioxperf_name, 'minio')
|
||||
ports = {'9000/tcp': 9000, '9001/tcp': 9001}
|
||||
volumes = {os.path.join(os.getcwd(), 'volumes/minio'): {
|
||||
'bind': '/data',
|
||||
'mode': 'rw',
|
||||
}}
|
||||
env = {
|
||||
'MINIO_ROOT_USER': 'minio',
|
||||
'MINIO_ROOT_PASSWORD': 'miniominio',
|
||||
'MINIO_PROMETHEUS_AUTH_TYPE': 'public',
|
||||
'MINIO_HTTP_TRACE': '/dev/stdout',
|
||||
}
|
||||
docker_pull_image_if_needed(dc, image)
|
||||
container = dc.containers.run(image=image, command=command, detach=True, environment=env, name=name,
|
||||
hostname='minio', labels=ioxperf_labels, network=ioxperf_name, ports=ports,
|
||||
volumes=volumes)
|
||||
docker_wait_container_running(container)
|
||||
|
||||
while True:
|
||||
timeout = urllib3.util.Timeout(connect=0.1, read=0.1)
|
||||
http_client = urllib3.PoolManager(num_pools=1, timeout=timeout, retries=False)
|
||||
try:
|
||||
mc = minio.Minio(endpoint='127.0.0.1:9000', access_key='minio', secret_key='miniominio', secure=False,
|
||||
http_client=http_client)
|
||||
if not mc.bucket_exists('iox1'):
|
||||
mc.make_bucket('iox1')
|
||||
if not mc.bucket_exists('iox2'):
|
||||
mc.make_bucket('iox2')
|
||||
break
|
||||
except (urllib3.exceptions.ProtocolError, urllib3.exceptions.TimeoutError, minio.error.S3Error):
|
||||
pass
|
||||
print('waiting for Minio to become ready')
|
||||
time.sleep(0.5)
|
||||
|
||||
pipe_container_logs_to_file(container, 'minio.log')
|
||||
print('Minio service ready')
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def docker_run_jaeger(dc):
|
||||
image = 'jaegertracing/all-in-one:1.25.0'
|
||||
name = '%s-%s' % (ioxperf_name, 'jaeger')
|
||||
ports = {'16686/tcp': 16686}
|
||||
docker_pull_image_if_needed(dc, image)
|
||||
container = dc.containers.run(image=image, detach=True, name=name, hostname='jaeger', labels=ioxperf_labels,
|
||||
network=ioxperf_name, ports=ports)
|
||||
docker_wait_container_running(container)
|
||||
|
||||
while True:
|
||||
try:
|
||||
if requests.get(url='http://127.0.0.1:16686/search', timeout=0.1).status_code / 100 == 2:
|
||||
break
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
pass
|
||||
print('waiting for Jaeger to become ready')
|
||||
time.sleep(0.1)
|
||||
|
||||
pipe_container_logs_to_file(container, 'jaeger.log')
|
||||
print('Jaeger service ready')
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def cargo_build_iox(debug=False):
|
||||
t = time.time()
|
||||
print('building IOx')
|
||||
|
||||
args = ['cargo', 'build']
|
||||
if not debug:
|
||||
args += ['--release']
|
||||
args += ['--package', 'influxdb_iox', '--features', 'aws,jaeger', '--bin', 'influxdb_iox']
|
||||
args += ['--package', 'iox_data_generator', '--bin', 'iox_data_generator', '--bin', 'create_database']
|
||||
subprocess.run(args=args)
|
||||
|
||||
print('building IOx finished in %.2fs' % (time.time() - t))
|
||||
|
||||
|
||||
def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False):
|
||||
http_addr = 'localhost:%d' % (id * 10000 + 8080)
|
||||
grpc_addr = 'localhost:%d' % (id * 10000 + 8082)
|
||||
|
||||
if debug:
|
||||
iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox'))
|
||||
else:
|
||||
iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox'))
|
||||
args = [iox_path, 'run']
|
||||
env = {
|
||||
'INFLUXDB_IOX_ID': str(id),
|
||||
'INFLUXDB_IOX_BIND_ADDR': http_addr,
|
||||
'INFLUXDB_IOX_GRPC_BIND_ADDR': grpc_addr,
|
||||
'INFLUXDB_IOX_BUCKET': 'iox%d' % id,
|
||||
'LOG_DESTINATION': 'stdout',
|
||||
'LOG_FORMAT': 'full',
|
||||
'TRACES_EXPORTER': 'jaeger',
|
||||
'TRACES_EXPORTER_JAEGER_AGENT_HOST': 'localhost',
|
||||
'TRACES_EXPORTER_JAEGER_AGENT_PORT': '6831',
|
||||
'TRACES_EXPORTER_JAEGER_SERVICE_NAME': service_name,
|
||||
'TRACES_SAMPLER': 'always_on',
|
||||
'RUST_BACKTRACE': '1',
|
||||
'LOG_FILTER': 'debug,lifecycle=info,rusoto_core=warn,hyper=warn,h2=warn',
|
||||
}
|
||||
|
||||
if object_store == 'memory':
|
||||
env['INFLUXDB_IOX_OBJECT_STORE'] = 'memory'
|
||||
elif object_store == 's3':
|
||||
env['INFLUXDB_IOX_OBJECT_STORE'] = 's3'
|
||||
env['AWS_ACCESS_KEY_ID'] = 'minio'
|
||||
env['AWS_SECRET_ACCESS_KEY'] = 'miniominio'
|
||||
env['AWS_ENDPOINT'] = 'http://localhost:9000'
|
||||
elif object_store == 'file':
|
||||
env['INFLUXDB_IOX_OBJECT_STORE'] = 'file'
|
||||
env['INFLUXDB_IOX_DB_DIR'] = 'volumes/%s' % service_name
|
||||
else:
|
||||
raise ValueError('invalid object_store value "%s"' % object_store)
|
||||
|
||||
if print_only:
|
||||
print()
|
||||
for k in sorted(env.keys()):
|
||||
print('%s=%s' % (k, env[k]))
|
||||
print(' '.join(args))
|
||||
print()
|
||||
return None
|
||||
|
||||
log_file = open('logs/%s.log' % service_name, mode='w')
|
||||
process = subprocess.Popen(args=args, env=env, stdout=log_file, stderr=log_file)
|
||||
|
||||
while True:
|
||||
if process.poll() is not None:
|
||||
raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name))
|
||||
router = grpc_requests.Client(grpc_addr, lazy=True)
|
||||
while True:
|
||||
try:
|
||||
router.register_service('influxdata.iox.management.v1.ManagementService')
|
||||
break
|
||||
except:
|
||||
# fall through to retry
|
||||
pass
|
||||
try:
|
||||
server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus', None)
|
||||
if 'server_status' in server_status_response and server_status_response['server_status']['initialized'] is True:
|
||||
break
|
||||
except:
|
||||
# fall through to retry
|
||||
pass
|
||||
|
||||
print('waiting for %s to become ready' % service_name)
|
||||
time.sleep(0.1)
|
||||
|
||||
print('%s service ready' % service_name)
|
||||
|
||||
return process
|
||||
|
||||
|
||||
def grpc_create_database(router_id, writer_id):
|
||||
print('creating database "%s" on both IOx servers' % db_name)
|
||||
|
||||
router_db_rules = {
|
||||
'rules': {
|
||||
'name': db_name,
|
||||
'partition_template': {
|
||||
'parts': [
|
||||
{'time': '%Y-%m-%d %H:00:00'},
|
||||
],
|
||||
},
|
||||
'lifecycle_rules': {
|
||||
'immutable': True,
|
||||
'worker_backoff_millis': '1000',
|
||||
'catalog_transactions_until_checkpoint': '100',
|
||||
'late_arrive_window_seconds': 300,
|
||||
'persist_row_threshold': '1000000',
|
||||
'persist_age_threshold_seconds': 1800,
|
||||
'mub_row_threshold': '100000',
|
||||
'max_active_compactions_cpu_fraction': 1.0,
|
||||
},
|
||||
'routing_config': {'sink': {'kafka': {}}},
|
||||
'worker_cleanup_avg_sleep': '500s',
|
||||
'writing': '127.0.0.1:9093',
|
||||
},
|
||||
}
|
||||
|
||||
writer_db_rules = {
|
||||
'rules': {
|
||||
'name': db_name,
|
||||
'partition_template': {
|
||||
'parts': [
|
||||
{'time': '%Y-%m-%d %H:00:00'}
|
||||
],
|
||||
},
|
||||
'lifecycle_rules': {
|
||||
'buffer_size_soft': 1024 * 1024 * 1024,
|
||||
'buffer_size_hard': 1024 * 1024 * 1024 * 2,
|
||||
'worker_backoff_millis': 100,
|
||||
'max_active_compactions': 1,
|
||||
'persist': True,
|
||||
'persist_row_threshold': 10000000,
|
||||
'catalog_transactions_until_checkpoint': 100,
|
||||
'late_arrive_window_seconds': 300,
|
||||
'persist_age_threshold_seconds': 1800,
|
||||
'mub_row_threshold': 100000,
|
||||
},
|
||||
'routing_config': {'sink': {'kafka': {}}},
|
||||
'worker_cleanup_avg_sleep': '500s',
|
||||
'reading': '127.0.0.1:9093',
|
||||
},
|
||||
}
|
||||
|
||||
if router_id is not None:
|
||||
router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082)
|
||||
router = grpc_requests.Client(router_grpc_addr, lazy=True)
|
||||
router.register_service('influxdata.iox.management.v1.ManagementService')
|
||||
router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules)
|
||||
|
||||
router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080)
|
||||
router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name)
|
||||
lp = "sentinel,source=perf.py f=1i"
|
||||
response = requests.post(url=router_write_url, data=lp, timeout=10)
|
||||
if not response.ok:
|
||||
print('failed to write to router')
|
||||
print(response.reason)
|
||||
print(response.content)
|
||||
return
|
||||
|
||||
else:
|
||||
print()
|
||||
print(router_db_rules)
|
||||
print()
|
||||
|
||||
if writer_id is not None:
|
||||
writer_grpc_addr = 'localhost:%d' % (writer_id * 10000 + 8082)
|
||||
writer = grpc_requests.Client(writer_grpc_addr, lazy=True)
|
||||
writer.register_service('influxdata.iox.management.v1.ManagementService')
|
||||
writer.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', writer_db_rules)
|
||||
|
||||
writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080)
|
||||
writer_query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name)
|
||||
writer_query_params = {'q': 'select count(1) from sentinel'}
|
||||
|
||||
response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10)
|
||||
for i in range(20):
|
||||
if response.ok:
|
||||
break
|
||||
print('waiting for round trip test to succeed')
|
||||
time.sleep(0.5)
|
||||
response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10)
|
||||
|
||||
if not response.ok:
|
||||
print(response.reason)
|
||||
print(response.content)
|
||||
return
|
||||
|
||||
else:
|
||||
print()
|
||||
print(writer_db_rules)
|
||||
print()
|
||||
|
||||
print('created database "%s" on both IOx servers' % db_name)
|
||||
|
||||
|
||||
def run_test_battery(battery_name, router_id, writer_id, debug=False):
|
||||
print('starting test battery "%s"' % battery_name)
|
||||
|
||||
# Write
|
||||
|
||||
battery_dir = os.path.join(os.getcwd(), battery_name)
|
||||
datagen_filename = os.path.join(battery_dir, 'datagen.toml')
|
||||
if debug:
|
||||
iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/iox_data_generator'))
|
||||
else:
|
||||
iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/iox_data_generator'))
|
||||
|
||||
router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080)
|
||||
args = [iox_data_generator_path,
|
||||
'--host', router_http_addr, '--token', 'arbitrary',
|
||||
'--org', org_name, '--bucket', bucket_name,
|
||||
'--spec', datagen_filename]
|
||||
env = {
|
||||
'RUST_BACKTRACE': '0',
|
||||
}
|
||||
log_file = open('logs/test.log', mode='w')
|
||||
if subprocess.run(args=args, stdout=log_file, stderr=log_file, env=env).returncode != 0:
|
||||
raise ChildProcessError(
|
||||
'failed to run iox_data_generator for battery "%s", check %s' % (battery_name, log_file.name))
|
||||
|
||||
# Query
|
||||
|
||||
writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080)
|
||||
query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name)
|
||||
queries_filename = os.path.join(battery_dir, 'queries.toml')
|
||||
queries = toml.load(open(queries_filename))
|
||||
|
||||
for query in queries['queries']:
|
||||
if 'sql' not in query:
|
||||
print('query missing SQL query')
|
||||
print(query)
|
||||
print()
|
||||
continue
|
||||
sql = query['sql']
|
||||
name = query['name']
|
||||
if name is None:
|
||||
name = sql
|
||||
|
||||
print('running test "%s"' % name)
|
||||
time_start = time.time()
|
||||
params = {'q': sql, 'format': 'csv'}
|
||||
response = requests.get(url=query_url, params=params)
|
||||
time_delta = '%dms' % math.floor((time.time() - time_start) * 1000)
|
||||
|
||||
if not response.ok:
|
||||
print(response.reason)
|
||||
print(response.content.text)
|
||||
print()
|
||||
continue
|
||||
|
||||
got = response.content.text.strip()
|
||||
print('time: %s' % time_delta)
|
||||
if 'expect' in query:
|
||||
expect = query['expect'].strip()
|
||||
if expect != got:
|
||||
print('expected: %s' % expect)
|
||||
print('got: %s' % got)
|
||||
else:
|
||||
print('OK')
|
||||
|
||||
elif 'expect_filename' in query:
|
||||
path = pathlib.Path(os.path.join(battery_dir, query['expect_filename']))
|
||||
if not path.is_file():
|
||||
print('file "%s" not found' % path)
|
||||
print()
|
||||
continue
|
||||
expect = open(path).read().strip()
|
||||
if expect != got:
|
||||
print('expected: %s' % expect)
|
||||
print('got: %s' % got)
|
||||
else:
|
||||
print('OK')
|
||||
else:
|
||||
print('OK')
|
||||
|
||||
print()
|
||||
|
||||
print('completed test battery "%s"' % battery_name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger('grpc_requests.client').setLevel(logging.ERROR)
|
||||
main()
|
|
@ -0,0 +1,9 @@
|
|||
docker>=5.0,<6
|
||||
grpc-requests>=0.1,<0.2
|
||||
grpcio>=1.39,<1.40
|
||||
grpcio-reflection>=1.39,<1.40
|
||||
protobuf>=3.17,<3.18
|
||||
minio
|
||||
requests
|
||||
toml
|
||||
urllib3
|
|
@ -312,8 +312,8 @@ async fn sql_select_from_system_chunks() {
|
|||
"+----+---------------+------------+-------------------+--------------+-----------+",
|
||||
"| id | partition_key | table_name | storage | memory_bytes | row_count |",
|
||||
"+----+---------------+------------+-------------------+--------------+-----------+",
|
||||
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |",
|
||||
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |",
|
||||
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 1639 | 3 |",
|
||||
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 1635 | 2 |",
|
||||
"+----+---------------+------------+-------------------+--------------+-----------+",
|
||||
];
|
||||
run_sql_test_case!(
|
||||
|
@ -368,15 +368,15 @@ async fn sql_select_from_system_chunk_columns() {
|
|||
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 347 |",
|
||||
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 |",
|
||||
"| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 110 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 35 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 25 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 41 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 25 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 25 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 31 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 17 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 27 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 17 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 309 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 297 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 313 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 297 |",
|
||||
"| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 297 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 309 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 297 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 309 |",
|
||||
"| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 297 |",
|
||||
"+---------------+----------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+",
|
||||
];
|
||||
run_sql_test_case!(
|
||||
|
|
|
@ -2107,13 +2107,18 @@ mod tests {
|
|||
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||
|
||||
// verify chunk size updated
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 700)
|
||||
.unwrap();
|
||||
|
||||
// write into same chunk again.
|
||||
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||
write_lp(db.as_ref(), "cpu bar=3 30").await;
|
||||
write_lp(db.as_ref(), "cpu bar=4 40").await;
|
||||
write_lp(db.as_ref(), "cpu bar=5 50").await;
|
||||
|
||||
// verify chunk size updated
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 764)
|
||||
.unwrap();
|
||||
|
||||
// Still only one chunk open
|
||||
test_db
|
||||
|
@ -2131,7 +2136,7 @@ mod tests {
|
|||
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
|
||||
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||
|
||||
|
@ -2153,7 +2158,7 @@ mod tests {
|
|||
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
|
||||
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||
|
||||
|
@ -2181,12 +2186,12 @@ mod tests {
|
|||
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
|
||||
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 5.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||
|
||||
// verify chunk size updated (chunk moved from closing to moving to moved)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
||||
let expected_read_buffer_size = 1916;
|
||||
let expected_read_buffer_size = 1922;
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
"read_buffer",
|
||||
|
@ -2234,8 +2239,8 @@ mod tests {
|
|||
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
|
||||
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 5.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 5.0);
|
||||
|
||||
db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap();
|
||||
|
||||
|
@ -2253,7 +2258,7 @@ mod tests {
|
|||
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
|
||||
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 2.0);
|
||||
assert_metric("catalog_loaded_rows", "object_store", 5.0);
|
||||
|
||||
// verify chunk size not increased for OS (it was in OS before unload)
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
|
@ -2574,7 +2579,7 @@ mod tests {
|
|||
("svr_id", "1"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(280.0)
|
||||
.sample_sum_eq(5085.0)
|
||||
.unwrap();
|
||||
|
||||
// RB chunk size
|
||||
|
@ -3161,7 +3166,7 @@ mod tests {
|
|||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer,
|
||||
lifecycle_action: None,
|
||||
memory_bytes: 70, // memory_size
|
||||
memory_bytes: 1006, // memory_size
|
||||
object_store_bytes: 0, // os_size
|
||||
row_count: 1,
|
||||
time_of_last_access: None,
|
||||
|
@ -3479,7 +3484,7 @@ mod tests {
|
|||
id: 1,
|
||||
storage: ChunkStorage::OpenMutableBuffer,
|
||||
lifecycle_action,
|
||||
memory_bytes: 87,
|
||||
memory_bytes: 1303,
|
||||
object_store_bytes: 0, // no OS chunks
|
||||
row_count: 1,
|
||||
time_of_last_access: None,
|
||||
|
@ -3501,7 +3506,7 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
|
||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303);
|
||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2766);
|
||||
assert_eq!(db.catalog.metrics().memory().object_store(), 2007);
|
||||
}
|
||||
|
|
|
@ -276,7 +276,7 @@ impl CatalogChunk {
|
|||
.state
|
||||
.inc_with_attributes(&[KeyValue::new("state", "open")]);
|
||||
|
||||
let mut chunk = Self {
|
||||
let chunk = Self {
|
||||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
|
@ -313,7 +313,7 @@ impl CatalogChunk {
|
|||
.state
|
||||
.inc_with_attributes(&[KeyValue::new("state", "compacted")]);
|
||||
|
||||
let mut chunk = Self {
|
||||
let chunk = Self {
|
||||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
|
@ -350,7 +350,7 @@ impl CatalogChunk {
|
|||
meta,
|
||||
};
|
||||
|
||||
let mut chunk = Self {
|
||||
let chunk = Self {
|
||||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
|
@ -412,7 +412,7 @@ impl CatalogChunk {
|
|||
}
|
||||
|
||||
/// Updates `self.metrics` to match the contents of `self.stage`
|
||||
fn update_metrics(&mut self) {
|
||||
pub fn update_metrics(&self) {
|
||||
match &self.stage {
|
||||
ChunkStage::Open { mb_chunk } => {
|
||||
self.metrics.memory_metrics.set_mub_only(mb_chunk.size());
|
||||
|
@ -627,7 +627,7 @@ impl CatalogChunk {
|
|||
assert!(self.time_closed.is_none());
|
||||
|
||||
self.time_closed = Some(Utc::now());
|
||||
let s = mb_chunk.snapshot();
|
||||
let (s, _) = mb_chunk.snapshot();
|
||||
self.metrics
|
||||
.state
|
||||
.inc_with_attributes(&[KeyValue::new("state", "closed")]);
|
||||
|
@ -880,9 +880,7 @@ impl CatalogChunk {
|
|||
self.set_lifecycle_action(ChunkLifecycleAction::Dropping, registration)?;
|
||||
|
||||
// set memory metrics to 0 to stop accounting for this chunk within the catalog
|
||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
||||
self.metrics.memory_metrics.read_buffer.set(0);
|
||||
self.metrics.memory_metrics.object_store.set(0);
|
||||
self.metrics.memory_metrics.set_to_zero();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -214,82 +214,122 @@ impl PartitionMetrics {
|
|||
///
|
||||
/// This can then be used within each `CatalogChunk` to record its observations for
|
||||
/// the different storages
|
||||
#[derive(Debug)]
|
||||
pub struct StorageGauge {
|
||||
pub(super) mutable_buffer: GaugeValue,
|
||||
pub(super) read_buffer: GaugeValue,
|
||||
pub(super) object_store: GaugeValue,
|
||||
inner: Mutex<StorageGaugeInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for StorageGauge {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("StorageGauge").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
struct StorageGaugeInner {
|
||||
mutable_buffer: GaugeValue,
|
||||
read_buffer: GaugeValue,
|
||||
object_store: GaugeValue,
|
||||
}
|
||||
|
||||
impl StorageGauge {
|
||||
pub(super) fn new_unregistered() -> Self {
|
||||
Self {
|
||||
let inner = StorageGaugeInner {
|
||||
mutable_buffer: GaugeValue::new_unregistered(),
|
||||
read_buffer: GaugeValue::new_unregistered(),
|
||||
object_store: GaugeValue::new_unregistered(),
|
||||
};
|
||||
Self {
|
||||
inner: Mutex::new(inner),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn new(gauge: &Gauge) -> Self {
|
||||
Self {
|
||||
let inner = StorageGaugeInner {
|
||||
mutable_buffer: gauge.gauge_value(&[KeyValue::new("location", "mutable_buffer")]),
|
||||
read_buffer: gauge.gauge_value(&[KeyValue::new("location", "read_buffer")]),
|
||||
object_store: gauge.gauge_value(&[KeyValue::new("location", "object_store")]),
|
||||
};
|
||||
Self {
|
||||
inner: Mutex::new(inner),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn set_mub_only(&mut self, value: usize) {
|
||||
self.mutable_buffer.set(value);
|
||||
self.read_buffer.set(0);
|
||||
self.object_store.set(0);
|
||||
pub(super) fn set_mub_only(&self, value: usize) {
|
||||
let mut guard = self.inner.lock();
|
||||
|
||||
guard.mutable_buffer.set(value);
|
||||
guard.read_buffer.set(0);
|
||||
guard.object_store.set(0);
|
||||
}
|
||||
|
||||
pub(super) fn set_rub_only(&mut self, value: usize) {
|
||||
self.mutable_buffer.set(0);
|
||||
self.read_buffer.set(value);
|
||||
self.object_store.set(0);
|
||||
pub(super) fn set_rub_only(&self, value: usize) {
|
||||
let mut guard = self.inner.lock();
|
||||
|
||||
guard.mutable_buffer.set(0);
|
||||
guard.read_buffer.set(value);
|
||||
guard.object_store.set(0);
|
||||
}
|
||||
|
||||
pub(super) fn set_rub_and_object_store_only(&mut self, rub: usize, parquet: usize) {
|
||||
self.mutable_buffer.set(0);
|
||||
self.read_buffer.set(rub);
|
||||
self.object_store.set(parquet);
|
||||
pub(super) fn set_rub_and_object_store_only(&self, rub: usize, parquet: usize) {
|
||||
let mut guard = self.inner.lock();
|
||||
|
||||
guard.mutable_buffer.set(0);
|
||||
guard.read_buffer.set(rub);
|
||||
guard.object_store.set(parquet);
|
||||
}
|
||||
|
||||
pub(super) fn set_object_store_only(&mut self, value: usize) {
|
||||
self.mutable_buffer.set(0);
|
||||
self.read_buffer.set(0);
|
||||
self.object_store.set(value);
|
||||
pub(super) fn set_object_store_only(&self, value: usize) {
|
||||
let mut guard = self.inner.lock();
|
||||
|
||||
guard.mutable_buffer.set(0);
|
||||
guard.read_buffer.set(0);
|
||||
guard.object_store.set(value);
|
||||
}
|
||||
|
||||
pub(super) fn set_to_zero(&self) {
|
||||
let mut guard = self.inner.lock();
|
||||
|
||||
guard.mutable_buffer.set(0);
|
||||
guard.read_buffer.set(0);
|
||||
guard.object_store.set(0);
|
||||
}
|
||||
|
||||
fn clone_empty(&self) -> Self {
|
||||
let guard = self.inner.lock();
|
||||
|
||||
let inner = StorageGaugeInner {
|
||||
mutable_buffer: guard.mutable_buffer.clone_empty(),
|
||||
read_buffer: guard.read_buffer.clone_empty(),
|
||||
object_store: guard.object_store.clone_empty(),
|
||||
};
|
||||
Self {
|
||||
mutable_buffer: self.mutable_buffer.clone_empty(),
|
||||
read_buffer: self.read_buffer.clone_empty(),
|
||||
object_store: self.object_store.clone_empty(),
|
||||
inner: Mutex::new(inner),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total for the mutable buffer
|
||||
pub fn mutable_buffer(&self) -> usize {
|
||||
self.mutable_buffer.get_total()
|
||||
let guard = self.inner.lock();
|
||||
guard.mutable_buffer.get_total()
|
||||
}
|
||||
|
||||
/// Returns the total for the read buffer
|
||||
pub fn read_buffer(&self) -> usize {
|
||||
self.read_buffer.get_total()
|
||||
let guard = self.inner.lock();
|
||||
guard.read_buffer.get_total()
|
||||
}
|
||||
|
||||
/// Returns the total for object storage
|
||||
pub fn object_store(&self) -> usize {
|
||||
self.object_store.get_total()
|
||||
let guard = self.inner.lock();
|
||||
guard.object_store.get_total()
|
||||
}
|
||||
|
||||
/// Returns the total over all storages
|
||||
pub fn total(&self) -> usize {
|
||||
self.mutable_buffer.get_total()
|
||||
+ self.read_buffer.get_total()
|
||||
+ self.object_store.get_total()
|
||||
let guard = self.inner.lock();
|
||||
guard.mutable_buffer.get_total()
|
||||
+ guard.read_buffer.get_total()
|
||||
+ guard.object_store.get_total()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,13 @@ impl DbChunk {
|
|||
|
||||
let (state, meta) = match chunk.stage() {
|
||||
ChunkStage::Open { mb_chunk, .. } => {
|
||||
let snapshot = mb_chunk.snapshot();
|
||||
let (snapshot, just_cached) = mb_chunk.snapshot();
|
||||
|
||||
// the snapshot might be cached, so we need to update the chunk metrics
|
||||
if just_cached {
|
||||
chunk.update_metrics();
|
||||
}
|
||||
|
||||
let state = State::MutableBuffer {
|
||||
chunk: Arc::clone(&snapshot),
|
||||
};
|
||||
|
|
|
@ -742,7 +742,7 @@ mod tests {
|
|||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||
)))
|
||||
.lifecycle_rules(data_types::database_rules::LifecycleRules {
|
||||
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
|
||||
buffer_size_hard: Some(NonZeroUsize::new(12_000).unwrap()),
|
||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||
catalog_transactions_until_checkpoint,
|
||||
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
|
||||
|
|
|
@ -524,7 +524,7 @@ async fn test_chunk_get() {
|
|||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer.into(),
|
||||
lifecycle_action,
|
||||
memory_bytes: 100,
|
||||
memory_bytes: 1016,
|
||||
object_store_bytes: 0,
|
||||
row_count: 2,
|
||||
time_of_last_access: None,
|
||||
|
@ -538,7 +538,7 @@ async fn test_chunk_get() {
|
|||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer.into(),
|
||||
lifecycle_action,
|
||||
memory_bytes: 82,
|
||||
memory_bytes: 1018,
|
||||
object_store_bytes: 0,
|
||||
row_count: 1,
|
||||
time_of_last_access: None,
|
||||
|
@ -709,7 +709,7 @@ async fn test_list_partition_chunks() {
|
|||
id: 0,
|
||||
storage: ChunkStorage::OpenMutableBuffer.into(),
|
||||
lifecycle_action: ChunkLifecycleAction::Unspecified.into(),
|
||||
memory_bytes: 100,
|
||||
memory_bytes: 1016,
|
||||
object_store_bytes: 0,
|
||||
row_count: 2,
|
||||
time_of_last_access: None,
|
||||
|
|
|
@ -365,7 +365,7 @@ async fn test_get_chunks() {
|
|||
.and(predicate::str::contains(
|
||||
r#""storage": "OpenMutableBuffer","#,
|
||||
))
|
||||
.and(predicate::str::contains(r#""memory_bytes": 100"#))
|
||||
.and(predicate::str::contains(r#""memory_bytes": 1016"#))
|
||||
// Check for a non empty timestamp such as
|
||||
// "time_of_first_write": "2021-03-30T17:11:10.723866Z",
|
||||
.and(predicate::str::contains(r#""time_of_first_write": "20"#));
|
||||
|
|
|
@ -705,7 +705,8 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
|
|||
.unwrap();
|
||||
|
||||
// ingest data as mixed throughput
|
||||
let producer = KafkaBufferProducer::new(kafka_connection, db_name).unwrap();
|
||||
let producer =
|
||||
KafkaBufferProducer::new(kafka_connection, db_name, &Default::default()).unwrap();
|
||||
producer
|
||||
.store_entry(
|
||||
&lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
|
||||
|
|
|
@ -109,7 +109,8 @@ impl WriteBufferConfigFactory {
|
|||
|
||||
let writer = match &cfg.type_[..] {
|
||||
"kafka" => {
|
||||
let kafka_buffer = KafkaBufferProducer::new(&cfg.connection, db_name)?;
|
||||
let kafka_buffer =
|
||||
KafkaBufferProducer::new(&cfg.connection, db_name, &cfg.connection_config)?;
|
||||
Arc::new(kafka_buffer) as _
|
||||
}
|
||||
"mock" => match self.get_mock(&cfg.connection)? {
|
||||
|
@ -140,8 +141,13 @@ impl WriteBufferConfigFactory {
|
|||
|
||||
let reader = match &cfg.type_[..] {
|
||||
"kafka" => {
|
||||
let kafka_buffer =
|
||||
KafkaBufferConsumer::new(&cfg.connection, server_id, db_name).await?;
|
||||
let kafka_buffer = KafkaBufferConsumer::new(
|
||||
&cfg.connection,
|
||||
server_id,
|
||||
db_name,
|
||||
&cfg.connection_config,
|
||||
)
|
||||
.await?;
|
||||
Box::new(kafka_buffer) as _
|
||||
}
|
||||
"mock" => match self.get_mock(&cfg.connection)? {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
collections::{BTreeMap, HashMap},
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
|
@ -90,6 +90,7 @@ impl KafkaBufferProducer {
|
|||
pub fn new(
|
||||
conn: impl Into<String>,
|
||||
database_name: impl Into<String>,
|
||||
connection_config: &HashMap<String, String>,
|
||||
) -> Result<Self, KafkaError> {
|
||||
let conn = conn.into();
|
||||
let database_name = database_name.into();
|
||||
|
@ -102,6 +103,9 @@ impl KafkaBufferProducer {
|
|||
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
|
||||
cfg.set("compression.type", "snappy");
|
||||
cfg.set("statistics.interval.ms", "15000");
|
||||
for (k, v) in connection_config {
|
||||
cfg.set(k, v);
|
||||
}
|
||||
|
||||
let producer: FutureProducer = cfg.create()?;
|
||||
|
||||
|
@ -246,6 +250,7 @@ impl KafkaBufferConsumer {
|
|||
conn: impl Into<String> + Send + Sync,
|
||||
server_id: ServerId,
|
||||
database_name: impl Into<String> + Send + Sync,
|
||||
connection_config: &HashMap<String, String>,
|
||||
) -> Result<Self, KafkaError> {
|
||||
let conn = conn.into();
|
||||
let database_name = database_name.into();
|
||||
|
@ -256,6 +261,9 @@ impl KafkaBufferConsumer {
|
|||
cfg.set("enable.auto.commit", "false");
|
||||
cfg.set("statistics.interval.ms", "15000");
|
||||
cfg.set("queued.max.messages.kbytes", "10000");
|
||||
for (k, v) in connection_config {
|
||||
cfg.set(k, v);
|
||||
}
|
||||
|
||||
// Create a unique group ID for this database's consumer as we don't want to create
|
||||
// consumer groups.
|
||||
|
@ -484,15 +492,20 @@ mod tests {
|
|||
type Reading = KafkaBufferConsumer;
|
||||
|
||||
fn writing(&self) -> Self::Writing {
|
||||
KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap()
|
||||
KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()).unwrap()
|
||||
}
|
||||
|
||||
async fn reading(&self) -> Self::Reading {
|
||||
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let server_id = ServerId::try_from(server_id).unwrap();
|
||||
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name)
|
||||
.await
|
||||
.unwrap()
|
||||
KafkaBufferConsumer::new(
|
||||
&self.conn,
|
||||
server_id,
|
||||
&self.database_name,
|
||||
&Default::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue