diff --git a/iox_data_generator/README.md b/iox_data_generator/README.md index a6b1f717fb..21da13fdcd 100644 --- a/iox_data_generator/README.md +++ b/iox_data_generator/README.md @@ -67,7 +67,7 @@ instance to write to Kafka and the database in the "reader" IOx instance to read you run it with: ``` -cargo run -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates +cargo run --release -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates ``` This script adds 3 rows to a `writer_test` table because [this issue with the Kafka Consumer @@ -81,7 +81,7 @@ from an existing schema as a guide. In this example, we're going to use Next, run the data generation tool as follows: ``` -cargo run -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates +cargo run --release -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates ``` - `--spec iox_data_generator/schemas/cap-write.toml` sets the schema you want to use to generate the data @@ -100,7 +100,7 @@ is, ``` # in your influxdb_iox checkout -cargo run -- sql -h http://127.0.0.1:8086 +cargo run --release -- sql -h http://127.0.0.1:8086 ``` Connecting to the writer instance won't show any data. diff --git a/perf/.gitignore b/perf/.gitignore new file mode 100644 index 0000000000..a787156826 --- /dev/null +++ b/perf/.gitignore @@ -0,0 +1,2 @@ +logs/ +volumes/ diff --git a/perf/README.md b/perf/README.md new file mode 100644 index 0000000000..de6aa4d313 --- /dev/null +++ b/perf/README.md @@ -0,0 +1,134 @@ +# Performance Tests + +This tool starts a complete test environment: + +- Kafka (docker) +- Minio (docker) +- Jaeger (docker) +- IOx router (local process) +- IOx writer (local process) +- test battery: + - generate data with iox_data_generator + - query data and benchmark + +Logs live in `perf/logs`. +As long as perf.py is running, this works: `tail -f logs/iox_router.log` +After perf.py exits, log files are closed. +When perf.py is run again, old log files are deleted. + +Persistence volumes live in `perf/volumes`. +Similar to log files, these data remain after perf.py exits, and are deleted when perf.py is run again. + +## Test Batteries + +A test battery is composed of: +- a directory, named for the battery name + - data generator spec, in file `datagen.toml` + - query tests, in file `queries.toml` + - SQL query + - (optional) name of query test + - (optional) expected results, as a string or as a file + +The data generator spec format is that defined by `iox_data_generator`. +[Read about that here](../iox_data_generator/README.md). + +The query tests file looks like this: +```toml +[[queries]] +name = "example query, no expected result" +sql = "select count(*) from cpu" + +[[queries]] +name = "example query, expected result in string" +sql = "select count(*) from cpu" +expect = """ +COUNT(Uint8(1)) +3 +""" + +[[queries]] +name = "example query, expected result in file" +sql = "select count(*) from cpu" +expect_filename = "foo.csv" +``` + +## Usage + +Help: +```console +perf/perf.py --help +``` + +Run all test batteries: +```console +perf/perf.py all +``` + +Run test batteries `battery-0` and `battery-1`: +```console +perf/perf.py battery-0 battery-1 +``` + +Keep all processes running after test batteries have completed: +```console +perf/perf.py battery-0 --hold +``` + +Do not run any tests, just create an empty playground environment: +```console +perf/perf.py --hold +``` + +Do not build IOx: +```console +perf/perf.py --skip-build +``` + +Use debug binaries (`target/debug`) rather than release binaries: +```console +perf/perf.py --debug +``` + +Use Kafka/Zookeeper instead of Redpanda: +```console +perf/perf.py --kafka-zookeeper +``` + +Use in-memory object store implementation, instead of S3/Minio: +```console +perf/perf.py --object-store memory +``` + +Use file object store implementation, instead of S3/Minio: +```console +perf/perf.py --object-store file +``` + +Just delete docker containers and network, then exit. +In the future, this will also detect orphaned IOx processes generated by perf, and delete those too: +```console +perf/perf.py --cleanup +``` + +## Install + +Install Docker: +https://www.docker.com/products/docker-desktop + +Install python3: + +```console +brew install python3 +``` + +or: + +```console +apt install python3 python3-pip +``` + +Install the required Python packages: + +```console +python3 -m pip install -r perf/requirements.txt +``` diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml new file mode 100644 index 0000000000..f34fd747f1 --- /dev/null +++ b/perf/battery-0/datagen.toml @@ -0,0 +1,18 @@ +name = "example" + +[[agents]] +name = "cap_write_{{agent_id}}" +count = 3 +sampling_interval = "10s" + +[[agents.measurements]] +name = "cpu" + +[[agents.measurements.tags]] +name = "host" +value = "host-{{agent_id}}" + +[[agents.measurements.fields]] +name = "usage_user" +f64_range = [0.0, 100.0] + diff --git a/perf/battery-0/foo.csv b/perf/battery-0/foo.csv new file mode 100644 index 0000000000..aeec46b5c4 --- /dev/null +++ b/perf/battery-0/foo.csv @@ -0,0 +1,2 @@ +COUNT(Uint8(1)) +3 diff --git a/perf/battery-0/queries.toml b/perf/battery-0/queries.toml new file mode 100644 index 0000000000..a22c1f40bb --- /dev/null +++ b/perf/battery-0/queries.toml @@ -0,0 +1,16 @@ +[[queries]] +name = "example query, no expected result" +sql = "select count(*) from cpu" + +[[queries]] +name = "example query, expected result in string" +sql = "select count(*) from cpu" +expect = """ +COUNT(Uint8(1)) +3 +""" + +[[queries]] +name = "example query, expected result in file" +sql = "select count(*) from cpu" +expect_filename = "foo.csv" diff --git a/perf/perf.py b/perf/perf.py new file mode 100755 index 0000000000..510f2586b6 --- /dev/null +++ b/perf/perf.py @@ -0,0 +1,633 @@ +#!/usr/bin/env python3 + +import argparse +import glob +import logging +import math +import os +import pathlib +import shutil +import signal +import socket +import subprocess +import threading +import time + +import docker +import grpc_requests +import minio +import requests +import toml +import urllib3 + +ioxperf_name = "ioxperf" +ioxperf_labels = {ioxperf_name: None} +ioxperf_filters = {'label': ioxperf_name} +org_name = 'myorg' +bucket_name = 'mybucket' +db_name = '%s_%s' % (org_name, bucket_name) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--skip-build', help='do not build IOx, execute existing binaries', action='store_true') + parser.add_argument('--debug', help='build/execute debug IOx binaries instead of release', action='store_true') + parser.add_argument('--object-store', help='object store type', default='s3', choices=('memory', 's3', 'file')) + parser.add_argument('--kafka-zookeeper', help='use Kafka/ZooKeeper instead of Redpanda', action='store_true') + parser.add_argument('--hold', help='keep all services running after tests complete', action='store_true') + parser.add_argument('--cleanup', help='remove Docker assets and exit (TODO terminate IOx processes)', action='store_true') + parser.add_argument('batteries', help='name of directories containing test batteries, or "all"', nargs='*') + args = parser.parse_args() + + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + dc = docker.from_env() + if args.cleanup: + docker_cleanup_resources(dc) + return + cleanup_logs_and_volumes(dc) + + batteries = args.batteries + if batteries == ['all']: + batteries = ( + p.relative_to(os.getcwd()) + for p + in pathlib.Path(os.getcwd()).iterdir() + if p.joinpath('datagen.toml').is_file() + ) + else: + for battery in batteries: + p = pathlib.Path(os.getcwd()).joinpath(battery, 'datagen.toml') + if not p.is_file(): + print('invalid battery "%s" - does not contain datagen.toml' % battery) + exit(1) + + processes = {} + + try: + if not args.skip_build: + cargo_build_iox(args.debug) + + docker_create_network(dc) + if args.kafka_zookeeper: + docker_run_zookeeper(dc) + docker_run_kafka(dc) + else: + docker_run_redpanda(dc) + if args.object_store == 's3': + docker_run_minio(dc) + docker_run_jaeger(dc) + processes['iox_router'] = exec_iox(1, 'iox_router', debug=args.debug, object_store=args.object_store) + processes['iox_writer'] = exec_iox(2, 'iox_writer', debug=args.debug, object_store=args.object_store) + grpc_create_database(1, 2) + + print('-' * 40) + for battery in batteries: + run_test_battery(battery, 1, 2, debug=args.debug) + print('-' * 40) + + except Exception as e: + print(e) + + if args.hold: + print('subprocesses are still running, ctrl-C to terminate and exit') + try: + signal.pause() + except KeyboardInterrupt: + pass + print('-' * 40) + + for service_name, process in processes.items(): + if process is None: + continue + print('%s <- SIGTERM' % service_name) + process.send_signal(signal.SIGTERM) + exit_code = process.wait(1.0) + if exit_code is None: + print('%s <- SIGKILL' % service_name) + process.send_signal(signal.SIGKILL) + if exit_code != 0: + print('%s exited with %d' % (service_name, exit_code)) + docker_cleanup_resources(dc) + + +def docker_cleanup_resources(dc): + containers = dc.containers.list(all=True, filters=ioxperf_filters) + if len(containers) > 0: + print('removing containers: %s' % ', '.join((c.name for c in containers))) + for container in containers: + container.remove(v=True, force=True) + + networks = dc.networks.list(filters=ioxperf_filters) + if len(networks) > 0: + print('removing networks: %s' % ', '.join((n.name for n in networks))) + for network in networks: + network.remove() + + +def cleanup_logs_and_volumes(dc): + docker_cleanup_resources(dc) + + volume_paths = glob.glob(os.path.join(os.getcwd(), 'volumes', '*')) + if len(volume_paths) > 0: + print('removing volume contents: %s' % ', '.join((os.path.relpath(p) for p in volume_paths))) + for path in volume_paths: + shutil.rmtree(path) + + log_paths = glob.glob(os.path.join(os.getcwd(), 'logs', '*')) + if len(log_paths) > 0: + print('removing logs: %s' % ', '.join((os.path.relpath(p) for p in log_paths))) + for path in log_paths: + os.unlink(path) + + +def docker_create_network(dc): + dc.networks.create(name=ioxperf_name, driver='bridge', check_duplicate=True, scope='local', + labels=ioxperf_labels) + + +def docker_pull_image_if_needed(dc, image): + try: + dc.images.get(image) + except docker.errors.ImageNotFound: + print("pulling image '%s'..." % image) + dc.images.pull(image) + + +def docker_wait_container_running(container): + while True: + container.reload() + if container.status == 'running': + print("container '%s' has started" % container.name) + return + elif container.status == 'created': + print("waiting for container '%s' to start" % container.name) + time.sleep(0.1) + raise Exception("container '%s' status '%s' unexpected" % (container.name, container.status)) + + +def pipe_container_logs_to_file(container, log_filename): + with pathlib.Path(os.path.join(os.getcwd(), 'logs')) as dir_path: + if not dir_path.exists(): + os.mkdir(dir_path, mode=0o777) + + logs = container.logs(stdout=True, stderr=True, stream=True, follow=True) + f = open(file=os.path.join(os.getcwd(), 'logs', log_filename), mode='wb', buffering=0) + + def thread_function(): + for entry in logs: + f.write(entry) + f.flush() + f.close() + + threading.Thread(target=thread_function, daemon=True).start() + + +def check_port_open(addr, port): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + port_open = sock.connect_ex((addr, port)) == 0 + sock.close() + return port_open + + +def docker_run_redpanda(dc): + image = 'vectorized/redpanda:v21.7.6' + command = ['redpanda', 'start', + '--overprovisioned', '--smp 1', '--memory 128M', '--reserve-memory', '0M', '--node-id', '0', + '--check=false', '--kafka-addr', 'CLIENT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093', + '--advertise-kafka-addr', 'CLIENT://kafka:9092,EXTERNAL://localhost:9093'] + name = '%s-%s' % (ioxperf_name, 'redpanda') + ports = {'9093/tcp': 9093} + volumes = {os.path.join(os.getcwd(), 'volumes/redpanda'): { + 'bind': '/var/lib/redpanda/data', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, command=command, detach=True, name=name, hostname='kafka', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 9093): + break + print('waiting for Redpanda to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'redpanda.log') + print('Redpanda service is ready') + + return container + + +def docker_run_zookeeper(dc): + image = 'docker.io/bitnami/zookeeper:3' + name = '%s-%s' % (ioxperf_name, 'zookeeper') + ports = {'2181/tcp': 2181} + env = { + 'ALLOW_ANONYMOUS_LOGIN': 'yes', + } + volumes = {os.path.join(os.getcwd(), 'volumes/zookeeper'): { + 'bind': '/bitnami/zookeeper', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='zookeeper', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 2181): + break + print('waiting for ZooKeeper to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'zookeeper.log') + print('ZooKeeper service is ready') + + return container + + +def docker_run_kafka(dc): + image = 'docker.io/bitnami/kafka:2' + name = '%s-%s' % (ioxperf_name, 'kafka') + ports = {'9093/tcp': 9093} + env = { + 'KAFKA_CFG_ZOOKEEPER_CONNECT': 'zookeeper:2181', + 'ALLOW_PLAINTEXT_LISTENER': 'yes', + 'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP': 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT', + 'KAFKA_CFG_LISTENERS': 'CLIENT://:9092,EXTERNAL://:9093', + 'KAFKA_CFG_ADVERTISED_LISTENERS': 'CLIENT://kafka:9092,EXTERNAL://localhost:9093', + 'KAFKA_INTER_BROKER_LISTENER_NAME': 'CLIENT', + 'KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS': '100', + } + volumes = {os.path.join(os.getcwd(), 'volumes/kafka'): { + 'bind': '/bitname/kafka', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='kafka', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 9093): + break + print('waiting for Kafka to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'kafka.log') + print('Kafka service is ready') + + return container + + +def docker_run_minio(dc): + image = 'minio/minio:RELEASE.2021-08-05T22-01-19Z' + command = 'server --address 0.0.0.0:9000 --console-address 0.0.0.0:9001 /data' + name = '%s-%s' % (ioxperf_name, 'minio') + ports = {'9000/tcp': 9000, '9001/tcp': 9001} + volumes = {os.path.join(os.getcwd(), 'volumes/minio'): { + 'bind': '/data', + 'mode': 'rw', + }} + env = { + 'MINIO_ROOT_USER': 'minio', + 'MINIO_ROOT_PASSWORD': 'miniominio', + 'MINIO_PROMETHEUS_AUTH_TYPE': 'public', + 'MINIO_HTTP_TRACE': '/dev/stdout', + } + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, command=command, detach=True, environment=env, name=name, + hostname='minio', labels=ioxperf_labels, network=ioxperf_name, ports=ports, + volumes=volumes) + docker_wait_container_running(container) + + while True: + timeout = urllib3.util.Timeout(connect=0.1, read=0.1) + http_client = urllib3.PoolManager(num_pools=1, timeout=timeout, retries=False) + try: + mc = minio.Minio(endpoint='127.0.0.1:9000', access_key='minio', secret_key='miniominio', secure=False, + http_client=http_client) + if not mc.bucket_exists('iox1'): + mc.make_bucket('iox1') + if not mc.bucket_exists('iox2'): + mc.make_bucket('iox2') + break + except (urllib3.exceptions.ProtocolError, urllib3.exceptions.TimeoutError, minio.error.S3Error): + pass + print('waiting for Minio to become ready') + time.sleep(0.5) + + pipe_container_logs_to_file(container, 'minio.log') + print('Minio service ready') + + return container + + +def docker_run_jaeger(dc): + image = 'jaegertracing/all-in-one:1.25.0' + name = '%s-%s' % (ioxperf_name, 'jaeger') + ports = {'16686/tcp': 16686} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, name=name, hostname='jaeger', labels=ioxperf_labels, + network=ioxperf_name, ports=ports) + docker_wait_container_running(container) + + while True: + try: + if requests.get(url='http://127.0.0.1:16686/search', timeout=0.1).status_code / 100 == 2: + break + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): + pass + print('waiting for Jaeger to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'jaeger.log') + print('Jaeger service ready') + + return container + + +def cargo_build_iox(debug=False): + t = time.time() + print('building IOx') + + args = ['cargo', 'build'] + if not debug: + args += ['--release'] + args += ['--package', 'influxdb_iox', '--features', 'aws,jaeger', '--bin', 'influxdb_iox'] + args += ['--package', 'iox_data_generator', '--bin', 'iox_data_generator', '--bin', 'create_database'] + subprocess.run(args=args) + + print('building IOx finished in %.2fs' % (time.time() - t)) + + +def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False): + http_addr = 'localhost:%d' % (id * 10000 + 8080) + grpc_addr = 'localhost:%d' % (id * 10000 + 8082) + + if debug: + iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox')) + else: + iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox')) + args = [iox_path, 'run'] + env = { + 'INFLUXDB_IOX_ID': str(id), + 'INFLUXDB_IOX_BIND_ADDR': http_addr, + 'INFLUXDB_IOX_GRPC_BIND_ADDR': grpc_addr, + 'INFLUXDB_IOX_BUCKET': 'iox%d' % id, + 'LOG_DESTINATION': 'stdout', + 'LOG_FORMAT': 'full', + 'TRACES_EXPORTER': 'jaeger', + 'TRACES_EXPORTER_JAEGER_AGENT_HOST': 'localhost', + 'TRACES_EXPORTER_JAEGER_AGENT_PORT': '6831', + 'TRACES_EXPORTER_JAEGER_SERVICE_NAME': service_name, + 'TRACES_SAMPLER': 'always_on', + 'RUST_BACKTRACE': '1', + 'LOG_FILTER': 'debug,lifecycle=info,rusoto_core=warn,hyper=warn,h2=warn', + } + + if object_store == 'memory': + env['INFLUXDB_IOX_OBJECT_STORE'] = 'memory' + elif object_store == 's3': + env['INFLUXDB_IOX_OBJECT_STORE'] = 's3' + env['AWS_ACCESS_KEY_ID'] = 'minio' + env['AWS_SECRET_ACCESS_KEY'] = 'miniominio' + env['AWS_ENDPOINT'] = 'http://localhost:9000' + elif object_store == 'file': + env['INFLUXDB_IOX_OBJECT_STORE'] = 'file' + env['INFLUXDB_IOX_DB_DIR'] = 'volumes/%s' % service_name + else: + raise ValueError('invalid object_store value "%s"' % object_store) + + if print_only: + print() + for k in sorted(env.keys()): + print('%s=%s' % (k, env[k])) + print(' '.join(args)) + print() + return None + + log_file = open('logs/%s.log' % service_name, mode='w') + process = subprocess.Popen(args=args, env=env, stdout=log_file, stderr=log_file) + + while True: + if process.poll() is not None: + raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name)) + router = grpc_requests.Client(grpc_addr, lazy=True) + while True: + try: + router.register_service('influxdata.iox.management.v1.ManagementService') + break + except: + # fall through to retry + pass + try: + server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus', None) + if 'server_status' in server_status_response and server_status_response['server_status']['initialized'] is True: + break + except: + # fall through to retry + pass + + print('waiting for %s to become ready' % service_name) + time.sleep(0.1) + + print('%s service ready' % service_name) + + return process + + +def grpc_create_database(router_id, writer_id): + print('creating database "%s" on both IOx servers' % db_name) + + router_db_rules = { + 'rules': { + 'name': db_name, + 'partition_template': { + 'parts': [ + {'time': '%Y-%m-%d %H:00:00'}, + ], + }, + 'lifecycle_rules': { + 'immutable': True, + 'worker_backoff_millis': '1000', + 'catalog_transactions_until_checkpoint': '100', + 'late_arrive_window_seconds': 300, + 'persist_row_threshold': '1000000', + 'persist_age_threshold_seconds': 1800, + 'mub_row_threshold': '100000', + 'max_active_compactions_cpu_fraction': 1.0, + }, + 'routing_config': {'sink': {'kafka': {}}}, + 'worker_cleanup_avg_sleep': '500s', + 'writing': '127.0.0.1:9093', + }, + } + + writer_db_rules = { + 'rules': { + 'name': db_name, + 'partition_template': { + 'parts': [ + {'time': '%Y-%m-%d %H:00:00'} + ], + }, + 'lifecycle_rules': { + 'buffer_size_soft': 1024 * 1024 * 1024, + 'buffer_size_hard': 1024 * 1024 * 1024 * 2, + 'worker_backoff_millis': 100, + 'max_active_compactions': 1, + 'persist': True, + 'persist_row_threshold': 10000000, + 'catalog_transactions_until_checkpoint': 100, + 'late_arrive_window_seconds': 300, + 'persist_age_threshold_seconds': 1800, + 'mub_row_threshold': 100000, + }, + 'routing_config': {'sink': {'kafka': {}}}, + 'worker_cleanup_avg_sleep': '500s', + 'reading': '127.0.0.1:9093', + }, + } + + if router_id is not None: + router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082) + router = grpc_requests.Client(router_grpc_addr, lazy=True) + router.register_service('influxdata.iox.management.v1.ManagementService') + router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules) + + router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) + router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name) + lp = "sentinel,source=perf.py f=1i" + response = requests.post(url=router_write_url, data=lp, timeout=10) + if not response.ok: + print('failed to write to router') + print(response.reason) + print(response.content) + return + + else: + print() + print(router_db_rules) + print() + + if writer_id is not None: + writer_grpc_addr = 'localhost:%d' % (writer_id * 10000 + 8082) + writer = grpc_requests.Client(writer_grpc_addr, lazy=True) + writer.register_service('influxdata.iox.management.v1.ManagementService') + writer.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', writer_db_rules) + + writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) + writer_query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) + writer_query_params = {'q': 'select count(1) from sentinel'} + + response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) + for i in range(20): + if response.ok: + break + print('waiting for round trip test to succeed') + time.sleep(0.5) + response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) + + if not response.ok: + print(response.reason) + print(response.content) + return + + else: + print() + print(writer_db_rules) + print() + + print('created database "%s" on both IOx servers' % db_name) + + +def run_test_battery(battery_name, router_id, writer_id, debug=False): + print('starting test battery "%s"' % battery_name) + + # Write + + battery_dir = os.path.join(os.getcwd(), battery_name) + datagen_filename = os.path.join(battery_dir, 'datagen.toml') + if debug: + iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/iox_data_generator')) + else: + iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/iox_data_generator')) + + router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) + args = [iox_data_generator_path, + '--host', router_http_addr, '--token', 'arbitrary', + '--org', org_name, '--bucket', bucket_name, + '--spec', datagen_filename] + env = { + 'RUST_BACKTRACE': '0', + } + log_file = open('logs/test.log', mode='w') + if subprocess.run(args=args, stdout=log_file, stderr=log_file, env=env).returncode != 0: + raise ChildProcessError( + 'failed to run iox_data_generator for battery "%s", check %s' % (battery_name, log_file.name)) + + # Query + + writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) + query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) + queries_filename = os.path.join(battery_dir, 'queries.toml') + queries = toml.load(open(queries_filename)) + + for query in queries['queries']: + if 'sql' not in query: + print('query missing SQL query') + print(query) + print() + continue + sql = query['sql'] + name = query['name'] + if name is None: + name = sql + + print('running test "%s"' % name) + time_start = time.time() + params = {'q': sql, 'format': 'csv'} + response = requests.get(url=query_url, params=params) + time_delta = '%dms' % math.floor((time.time() - time_start) * 1000) + + if not response.ok: + print(response.reason) + print(response.content.text) + print() + continue + + got = response.content.text.strip() + print('time: %s' % time_delta) + if 'expect' in query: + expect = query['expect'].strip() + if expect != got: + print('expected: %s' % expect) + print('got: %s' % got) + else: + print('OK') + + elif 'expect_filename' in query: + path = pathlib.Path(os.path.join(battery_dir, query['expect_filename'])) + if not path.is_file(): + print('file "%s" not found' % path) + print() + continue + expect = open(path).read().strip() + if expect != got: + print('expected: %s' % expect) + print('got: %s' % got) + else: + print('OK') + else: + print('OK') + + print() + + print('completed test battery "%s"' % battery_name) + + +if __name__ == "__main__": + logging.getLogger('grpc_requests.client').setLevel(logging.ERROR) + main() diff --git a/perf/requirements.txt b/perf/requirements.txt new file mode 100644 index 0000000000..c638092166 --- /dev/null +++ b/perf/requirements.txt @@ -0,0 +1,9 @@ +docker>=5.0,<6 +grpc-requests>=0.1,<0.2 +grpcio>=1.39,<1.40 +grpcio-reflection>=1.39,<1.40 +protobuf>=3.17,<3.18 +minio +requests +toml +urllib3