#!/usr/bin/env python3 import argparse import glob import logging import math import os import pathlib import shutil import signal import socket import subprocess import threading import time import docker import grpc_requests import minio import requests import toml import urllib3 ioxperf_name = "ioxperf" ioxperf_labels = {ioxperf_name: None} ioxperf_filters = {'label': ioxperf_name} org_name = 'myorg' bucket_name = 'mybucket' db_name = '%s_%s' % (org_name, bucket_name) def main(): parser = argparse.ArgumentParser() parser.add_argument('--skip-build', help='do not build IOx, execute existing binaries', action='store_true') parser.add_argument('--debug', help='build/execute debug IOx binaries instead of release', action='store_true') parser.add_argument('--object-store', help='object store type', default='s3', choices=('memory', 's3', 'file')) parser.add_argument('--kafka-zookeeper', help='use Kafka/ZooKeeper instead of Redpanda', action='store_true') parser.add_argument('--hold', help='keep all services running after tests complete', action='store_true') parser.add_argument('--cleanup', help='remove Docker assets and exit (TODO terminate IOx processes)', action='store_true') parser.add_argument('batteries', help='name of directories containing test batteries, or "all"', nargs='*') args = parser.parse_args() os.chdir(os.path.dirname(os.path.abspath(__file__))) dc = docker.from_env() if args.cleanup: docker_cleanup_resources(dc) return cleanup_logs_and_volumes(dc) batteries = args.batteries if batteries == ['all']: batteries = ( p.relative_to(os.getcwd()) for p in pathlib.Path(os.getcwd()).iterdir() if p.joinpath('datagen.toml').is_file() ) else: for battery in batteries: p = pathlib.Path(os.getcwd()).joinpath(battery, 'datagen.toml') if not p.is_file(): print('invalid battery "%s" - does not contain datagen.toml' % battery) exit(1) processes = {} try: if not args.skip_build: cargo_build_iox(args.debug) docker_create_network(dc) if args.kafka_zookeeper: docker_run_zookeeper(dc) docker_run_kafka(dc) else: docker_run_redpanda(dc) if args.object_store == 's3': docker_run_minio(dc) docker_run_jaeger(dc) processes['iox_router'] = exec_iox(1, 'iox_router', debug=args.debug, object_store=args.object_store) processes['iox_writer'] = exec_iox(2, 'iox_writer', debug=args.debug, object_store=args.object_store) grpc_create_database(1, 2) print('-' * 40) for battery in batteries: run_test_battery(battery, 1, 2, debug=args.debug) print('-' * 40) except Exception as e: print(e) if args.hold: print('subprocesses are still running, ctrl-C to terminate and exit') try: signal.pause() except KeyboardInterrupt: pass print('-' * 40) for service_name, process in processes.items(): if process is None: continue print('%s <- SIGTERM' % service_name) process.send_signal(signal.SIGTERM) exit_code = process.wait(1.0) if exit_code is None: print('%s <- SIGKILL' % service_name) process.send_signal(signal.SIGKILL) if exit_code != 0: print('%s exited with %d' % (service_name, exit_code)) docker_cleanup_resources(dc) def docker_cleanup_resources(dc): containers = dc.containers.list(all=True, filters=ioxperf_filters) if len(containers) > 0: print('removing containers: %s' % ', '.join((c.name for c in containers))) for container in containers: container.remove(v=True, force=True) networks = dc.networks.list(filters=ioxperf_filters) if len(networks) > 0: print('removing networks: %s' % ', '.join((n.name for n in networks))) for network in networks: network.remove() def cleanup_logs_and_volumes(dc): docker_cleanup_resources(dc) volume_paths = glob.glob(os.path.join(os.getcwd(), 'volumes', '*')) if len(volume_paths) > 0: print('removing volume contents: %s' % ', '.join((os.path.relpath(p) for p in volume_paths))) for path in volume_paths: shutil.rmtree(path) log_paths = glob.glob(os.path.join(os.getcwd(), 'logs', '*')) if len(log_paths) > 0: print('removing logs: %s' % ', '.join((os.path.relpath(p) for p in log_paths))) for path in log_paths: os.unlink(path) def docker_create_network(dc): dc.networks.create(name=ioxperf_name, driver='bridge', check_duplicate=True, scope='local', labels=ioxperf_labels) def docker_pull_image_if_needed(dc, image): try: dc.images.get(image) except docker.errors.ImageNotFound: print("pulling image '%s'..." % image) dc.images.pull(image) def docker_wait_container_running(container): while True: container.reload() if container.status == 'running': print("container '%s' has started" % container.name) return elif container.status == 'created': print("waiting for container '%s' to start" % container.name) time.sleep(0.1) raise Exception("container '%s' status '%s' unexpected" % (container.name, container.status)) def pipe_container_logs_to_file(container, log_filename): with pathlib.Path(os.path.join(os.getcwd(), 'logs')) as dir_path: if not dir_path.exists(): os.mkdir(dir_path, mode=0o777) logs = container.logs(stdout=True, stderr=True, stream=True, follow=True) f = open(file=os.path.join(os.getcwd(), 'logs', log_filename), mode='wb', buffering=0) def thread_function(): for entry in logs: f.write(entry) f.flush() f.close() threading.Thread(target=thread_function, daemon=True).start() def check_port_open(addr, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) port_open = sock.connect_ex((addr, port)) == 0 sock.close() return port_open def docker_run_redpanda(dc): image = 'vectorized/redpanda:v21.7.6' command = ['redpanda', 'start', '--overprovisioned', '--smp 1', '--memory 128M', '--reserve-memory', '0M', '--node-id', '0', '--check=false', '--kafka-addr', 'CLIENT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093', '--advertise-kafka-addr', 'CLIENT://kafka:9092,EXTERNAL://localhost:9093'] name = '%s-%s' % (ioxperf_name, 'redpanda') ports = {'9093/tcp': 9093} volumes = {os.path.join(os.getcwd(), 'volumes/redpanda'): { 'bind': '/var/lib/redpanda/data', 'mode': 'rw', }} docker_pull_image_if_needed(dc, image) container = dc.containers.run(image=image, command=command, detach=True, name=name, hostname='kafka', labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) docker_wait_container_running(container) while True: if check_port_open('127.0.0.1', 9093): break print('waiting for Redpanda to become ready') time.sleep(0.1) pipe_container_logs_to_file(container, 'redpanda.log') print('Redpanda service is ready') return container def docker_run_zookeeper(dc): image = 'docker.io/bitnami/zookeeper:3' name = '%s-%s' % (ioxperf_name, 'zookeeper') ports = {'2181/tcp': 2181} env = { 'ALLOW_ANONYMOUS_LOGIN': 'yes', } volumes = {os.path.join(os.getcwd(), 'volumes/zookeeper'): { 'bind': '/bitnami/zookeeper', 'mode': 'rw', }} docker_pull_image_if_needed(dc, image) container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='zookeeper', labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) docker_wait_container_running(container) while True: if check_port_open('127.0.0.1', 2181): break print('waiting for ZooKeeper to become ready') time.sleep(0.1) pipe_container_logs_to_file(container, 'zookeeper.log') print('ZooKeeper service is ready') return container def docker_run_kafka(dc): image = 'docker.io/bitnami/kafka:2' name = '%s-%s' % (ioxperf_name, 'kafka') ports = {'9093/tcp': 9093} env = { 'KAFKA_CFG_ZOOKEEPER_CONNECT': 'zookeeper:2181', 'ALLOW_PLAINTEXT_LISTENER': 'yes', 'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP': 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT', 'KAFKA_CFG_LISTENERS': 'CLIENT://:9092,EXTERNAL://:9093', 'KAFKA_CFG_ADVERTISED_LISTENERS': 'CLIENT://kafka:9092,EXTERNAL://localhost:9093', 'KAFKA_INTER_BROKER_LISTENER_NAME': 'CLIENT', 'KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS': '100', } volumes = {os.path.join(os.getcwd(), 'volumes/kafka'): { 'bind': '/bitname/kafka', 'mode': 'rw', }} docker_pull_image_if_needed(dc, image) container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='kafka', labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) docker_wait_container_running(container) while True: if check_port_open('127.0.0.1', 9093): break print('waiting for Kafka to become ready') time.sleep(0.1) pipe_container_logs_to_file(container, 'kafka.log') print('Kafka service is ready') return container def docker_run_minio(dc): image = 'minio/minio:RELEASE.2021-08-05T22-01-19Z' command = 'server --address 0.0.0.0:9000 --console-address 0.0.0.0:9001 /data' name = '%s-%s' % (ioxperf_name, 'minio') ports = {'9000/tcp': 9000, '9001/tcp': 9001} volumes = {os.path.join(os.getcwd(), 'volumes/minio'): { 'bind': '/data', 'mode': 'rw', }} env = { 'MINIO_ROOT_USER': 'minio', 'MINIO_ROOT_PASSWORD': 'miniominio', 'MINIO_PROMETHEUS_AUTH_TYPE': 'public', 'MINIO_HTTP_TRACE': '/dev/stdout', } docker_pull_image_if_needed(dc, image) container = dc.containers.run(image=image, command=command, detach=True, environment=env, name=name, hostname='minio', labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) docker_wait_container_running(container) while True: timeout = urllib3.util.Timeout(connect=0.1, read=0.1) http_client = urllib3.PoolManager(num_pools=1, timeout=timeout, retries=False) try: mc = minio.Minio(endpoint='127.0.0.1:9000', access_key='minio', secret_key='miniominio', secure=False, http_client=http_client) if not mc.bucket_exists('iox1'): mc.make_bucket('iox1') if not mc.bucket_exists('iox2'): mc.make_bucket('iox2') break except (urllib3.exceptions.ProtocolError, urllib3.exceptions.TimeoutError, minio.error.S3Error): pass print('waiting for Minio to become ready') time.sleep(0.5) pipe_container_logs_to_file(container, 'minio.log') print('Minio service ready') return container def docker_run_jaeger(dc): image = 'jaegertracing/all-in-one:1.25.0' name = '%s-%s' % (ioxperf_name, 'jaeger') ports = {'16686/tcp': 16686} docker_pull_image_if_needed(dc, image) container = dc.containers.run(image=image, detach=True, name=name, hostname='jaeger', labels=ioxperf_labels, network=ioxperf_name, ports=ports) docker_wait_container_running(container) while True: try: if requests.get(url='http://127.0.0.1:16686/search', timeout=0.1).status_code / 100 == 2: break except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): pass print('waiting for Jaeger to become ready') time.sleep(0.1) pipe_container_logs_to_file(container, 'jaeger.log') print('Jaeger service ready') return container def cargo_build_iox(debug=False): t = time.time() print('building IOx') args = ['cargo', 'build'] if not debug: args += ['--release'] args += ['--package', 'influxdb_iox', '--features', 'aws,jaeger', '--bin', 'influxdb_iox'] args += ['--package', 'iox_data_generator', '--bin', 'iox_data_generator', '--bin', 'create_database'] subprocess.run(args=args) print('building IOx finished in %.2fs' % (time.time() - t)) def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False): http_addr = 'localhost:%d' % (id * 10000 + 8080) grpc_addr = 'localhost:%d' % (id * 10000 + 8082) if debug: iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox')) else: iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox')) args = [iox_path, 'run'] env = { 'INFLUXDB_IOX_ID': str(id), 'INFLUXDB_IOX_BIND_ADDR': http_addr, 'INFLUXDB_IOX_GRPC_BIND_ADDR': grpc_addr, 'INFLUXDB_IOX_BUCKET': 'iox%d' % id, 'LOG_DESTINATION': 'stdout', 'LOG_FORMAT': 'full', 'TRACES_EXPORTER': 'jaeger', 'TRACES_EXPORTER_JAEGER_AGENT_HOST': 'localhost', 'TRACES_EXPORTER_JAEGER_AGENT_PORT': '6831', 'TRACES_EXPORTER_JAEGER_SERVICE_NAME': service_name, 'TRACES_SAMPLER': 'always_on', 'RUST_BACKTRACE': '1', 'LOG_FILTER': 'debug,lifecycle=info,rusoto_core=warn,hyper=warn,h2=warn', } if object_store == 'memory': env['INFLUXDB_IOX_OBJECT_STORE'] = 'memory' elif object_store == 's3': env['INFLUXDB_IOX_OBJECT_STORE'] = 's3' env['AWS_ACCESS_KEY_ID'] = 'minio' env['AWS_SECRET_ACCESS_KEY'] = 'miniominio' env['AWS_ENDPOINT'] = 'http://localhost:9000' elif object_store == 'file': env['INFLUXDB_IOX_OBJECT_STORE'] = 'file' env['INFLUXDB_IOX_DB_DIR'] = 'volumes/%s' % service_name else: raise ValueError('invalid object_store value "%s"' % object_store) if print_only: print() for k in sorted(env.keys()): print('%s=%s' % (k, env[k])) print(' '.join(args)) print() return None log_file = open('logs/%s.log' % service_name, mode='w') process = subprocess.Popen(args=args, env=env, stdout=log_file, stderr=log_file) while True: if process.poll() is not None: raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name)) router = grpc_requests.Client(grpc_addr, lazy=True) while True: try: router.register_service('influxdata.iox.management.v1.ManagementService') break except: # fall through to retry pass try: server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus', None) if 'server_status' in server_status_response and server_status_response['server_status']['initialized'] is True: break except: # fall through to retry pass print('waiting for %s to become ready' % service_name) time.sleep(0.1) print('%s service ready' % service_name) return process def grpc_create_database(router_id, writer_id): print('creating database "%s" on both IOx servers' % db_name) router_db_rules = { 'rules': { 'name': db_name, 'partition_template': { 'parts': [ {'time': '%Y-%m-%d %H:00:00'}, ], }, 'lifecycle_rules': { 'immutable': True, 'worker_backoff_millis': '1000', 'catalog_transactions_until_checkpoint': '100', 'late_arrive_window_seconds': 300, 'persist_row_threshold': '1000000', 'persist_age_threshold_seconds': 1800, 'mub_row_threshold': '100000', 'max_active_compactions_cpu_fraction': 1.0, }, 'routing_config': {'sink': {'kafka': {}}}, 'worker_cleanup_avg_sleep': '500s', 'writing': '127.0.0.1:9093', }, } writer_db_rules = { 'rules': { 'name': db_name, 'partition_template': { 'parts': [ {'time': '%Y-%m-%d %H:00:00'} ], }, 'lifecycle_rules': { 'buffer_size_soft': 1024 * 1024 * 1024, 'buffer_size_hard': 1024 * 1024 * 1024 * 2, 'worker_backoff_millis': 100, 'max_active_compactions': 1, 'persist': True, 'persist_row_threshold': 10000000, 'catalog_transactions_until_checkpoint': 100, 'late_arrive_window_seconds': 300, 'persist_age_threshold_seconds': 1800, 'mub_row_threshold': 100000, }, 'routing_config': {'sink': {'kafka': {}}}, 'worker_cleanup_avg_sleep': '500s', 'reading': '127.0.0.1:9093', }, } if router_id is not None: router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082) router = grpc_requests.Client(router_grpc_addr, lazy=True) router.register_service('influxdata.iox.management.v1.ManagementService') router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules) router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name) lp = "sentinel,source=perf.py f=1i" response = requests.post(url=router_write_url, data=lp, timeout=10) if not response.ok: print('failed to write to router') print(response.reason) print(response.content) return else: print() print(router_db_rules) print() if writer_id is not None: writer_grpc_addr = 'localhost:%d' % (writer_id * 10000 + 8082) writer = grpc_requests.Client(writer_grpc_addr, lazy=True) writer.register_service('influxdata.iox.management.v1.ManagementService') writer.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', writer_db_rules) writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) writer_query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) writer_query_params = {'q': 'select count(1) from sentinel'} response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) for i in range(20): if response.ok: break print('waiting for round trip test to succeed') time.sleep(0.5) response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) if not response.ok: print(response.reason) print(response.content) return else: print() print(writer_db_rules) print() print('created database "%s" on both IOx servers' % db_name) def run_test_battery(battery_name, router_id, writer_id, debug=False): print('starting test battery "%s"' % battery_name) # Write battery_dir = os.path.join(os.getcwd(), battery_name) datagen_filename = os.path.join(battery_dir, 'datagen.toml') if debug: iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/iox_data_generator')) else: iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/iox_data_generator')) router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) args = [iox_data_generator_path, '--host', router_http_addr, '--token', 'arbitrary', '--org', org_name, '--bucket', bucket_name, '--spec', datagen_filename] env = { 'RUST_BACKTRACE': '0', } log_file = open('logs/test.log', mode='w') if subprocess.run(args=args, stdout=log_file, stderr=log_file, env=env).returncode != 0: raise ChildProcessError( 'failed to run iox_data_generator for battery "%s", check %s' % (battery_name, log_file.name)) # Query writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) queries_filename = os.path.join(battery_dir, 'queries.toml') queries = toml.load(open(queries_filename)) for query in queries['queries']: if 'sql' not in query: print('query missing SQL query') print(query) print() continue sql = query['sql'] name = query['name'] if name is None: name = sql print('running test "%s"' % name) time_start = time.time() params = {'q': sql, 'format': 'csv'} response = requests.get(url=query_url, params=params) time_delta = '%dms' % math.floor((time.time() - time_start) * 1000) if not response.ok: print(response.reason) print(response.content.text) print() continue got = response.content.text.strip() print('time: %s' % time_delta) if 'expect' in query: expect = query['expect'].strip() if expect != got: print('expected: %s' % expect) print('got: %s' % got) else: print('OK') elif 'expect_filename' in query: path = pathlib.Path(os.path.join(battery_dir, query['expect_filename'])) if not path.is_file(): print('file "%s" not found' % path) print() continue expect = open(path).read().strip() if expect != got: print('expected: %s' % expect) print('got: %s' % got) else: print('OK') else: print('OK') print() print('completed test battery "%s"' % battery_name) if __name__ == "__main__": logging.getLogger('grpc_requests.client').setLevel(logging.ERROR) main()