diff --git a/perf/perf.py b/perf/perf.py index 2d7d7e32e0..30b3d40397 100755 --- a/perf/perf.py +++ b/perf/perf.py @@ -91,10 +91,22 @@ def main(): docker_run_minio(dc, args.no_volumes) if do_trace: docker_run_jaeger(dc) - processes['iox_router'] = exec_iox(1, 'iox_router', - debug=args.debug, object_store=args.object_store, do_trace=do_trace) - processes['iox_writer'] = exec_iox(2, 'iox_writer', - debug=args.debug, object_store=args.object_store, do_trace=do_trace) + processes['iox_router'] = exec_iox( + id=1, + service_name='iox_router', + mode='router', + debug=args.debug, + object_store=args.object_store, + do_trace=do_trace, + ) + processes['iox_writer'] = exec_iox( + id=2, + service_name='iox_writer', + mode='database', + debug=args.debug, + object_store=args.object_store, + do_trace=do_trace, + ) grpc_create_database(1, 2) print('-' * 40) @@ -406,7 +418,7 @@ def cargo_build_iox(debug=False, build_with_aws=True): print('building IOx finished in %.2fs' % (time.time() - t)) -def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False, do_trace=False): +def exec_iox(id, service_name, mode, debug=False, object_store='memory', print_only=False, do_trace=False): http_addr = 'localhost:%d' % (id * 10000 + 8080) grpc_addr = 'localhost:%d' % (id * 10000 + 8082) @@ -414,7 +426,7 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox')) else: iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox')) - args = [iox_path, 'run'] + args = [iox_path, 'run', mode] env = { 'INFLUXDB_IOX_ID': str(id), 'INFLUXDB_IOX_BIND_ADDR': http_addr, @@ -459,19 +471,24 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa while True: if process.poll() is not None: raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name)) + router = grpc_requests.Client(grpc_addr, lazy=True) - while True: - try: - router.register_service('influxdata.iox.management.v1.ManagementService') - break - except: - # fall through to retry - pass try: - server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus', - None) - if 'server_status' in server_status_response and server_status_response['server_status'][ - 'initialized'] is True: + router.register_service('influxdata.iox.deployment.v1.DeploymentService') + + if mode == 'database': + router.register_service('influxdata.iox.management.v1.ManagementService') + server_status_response = router.request( + 'influxdata.iox.management.v1.ManagementService', + 'GetServerStatus', + None, + ) + if ( + ('server_status' in server_status_response) + and (server_status_response['server_status']['initialized'] is True) + ): + break + else: break except: # fall through to retry @@ -488,36 +505,40 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa def grpc_create_database(router_id, writer_id): print('creating database "%s" on both IOx servers' % db_name) - router_db_rules = { - 'rules': { + router_config = { + 'router': { 'name': db_name, - 'partition_template': { - 'parts': [ - {'time': '%Y-%m-%d %H:00:00'}, + 'write_sharder': { + 'specific_targets': [ + { + 'matcher': { + 'table_name_regex': '.*', + }, + 'shard': 1, + }, ], + 'hash_ring': None, }, - 'lifecycle_rules': { - 'immutable': True, - 'worker_backoff_millis': '1000', - 'catalog_transactions_until_checkpoint': '100', - 'late_arrive_window_seconds': 300, - 'persist_row_threshold': '1000000', - 'persist_age_threshold_seconds': 1800, - 'mub_row_threshold': '100000', - 'max_active_compactions_cpu_fraction': 1.0, - }, - 'routing_config': {'sink': {'kafka': {}}}, - 'worker_cleanup_avg_sleep': '500s', - 'write_buffer_connection': { - 'direction': 'DIRECTION_WRITE', - 'type': 'kafka', - 'connection': '127.0.0.1:9093', - 'connection_config': {}, - 'creation_config': { - 'n_sequencers': 1, - 'options': {}, + 'write_sinks': { + 1: { + 'sinks': [ + { + 'write_buffer': { + 'direction': 'DIRECTION_WRITE', + 'type': 'kafka', + 'connection': '127.0.0.1:9093', + 'connection_config': {}, + 'creation_config': { + 'n_sequencers': 1, + 'options': {}, + }, + }, + 'ignore_errors': False, + }, + ], }, }, + 'query_sinks': None, }, } @@ -560,7 +581,7 @@ def grpc_create_database(router_id, writer_id): router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082) router = grpc_requests.Client(router_grpc_addr, lazy=True) router.register_service('influxdata.iox.management.v1.ManagementService') - router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules) + router.request('influxdata.iox.router.v1.RouterService', 'UpdateRouter', router_config) router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name)