refactor: use new router mode in `perf.py`

pull/24376/head
Marco Neumann 2021-11-24 19:02:28 +01:00
parent 5f883a96a6
commit 5532955437
1 changed files with 64 additions and 43 deletions

View File

@ -91,10 +91,22 @@ def main():
docker_run_minio(dc, args.no_volumes)
if do_trace:
docker_run_jaeger(dc)
processes['iox_router'] = exec_iox(1, 'iox_router',
debug=args.debug, object_store=args.object_store, do_trace=do_trace)
processes['iox_writer'] = exec_iox(2, 'iox_writer',
debug=args.debug, object_store=args.object_store, do_trace=do_trace)
processes['iox_router'] = exec_iox(
id=1,
service_name='iox_router',
mode='router',
debug=args.debug,
object_store=args.object_store,
do_trace=do_trace,
)
processes['iox_writer'] = exec_iox(
id=2,
service_name='iox_writer',
mode='database',
debug=args.debug,
object_store=args.object_store,
do_trace=do_trace,
)
grpc_create_database(1, 2)
print('-' * 40)
@ -406,7 +418,7 @@ def cargo_build_iox(debug=False, build_with_aws=True):
print('building IOx finished in %.2fs' % (time.time() - t))
def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False, do_trace=False):
def exec_iox(id, service_name, mode, debug=False, object_store='memory', print_only=False, do_trace=False):
http_addr = 'localhost:%d' % (id * 10000 + 8080)
grpc_addr = 'localhost:%d' % (id * 10000 + 8082)
@ -414,7 +426,7 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa
iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox'))
else:
iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox'))
args = [iox_path, 'run']
args = [iox_path, 'run', mode]
env = {
'INFLUXDB_IOX_ID': str(id),
'INFLUXDB_IOX_BIND_ADDR': http_addr,
@ -459,19 +471,24 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa
while True:
if process.poll() is not None:
raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name))
router = grpc_requests.Client(grpc_addr, lazy=True)
while True:
try:
router.register_service('influxdata.iox.management.v1.ManagementService')
break
except:
# fall through to retry
pass
try:
server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus',
None)
if 'server_status' in server_status_response and server_status_response['server_status'][
'initialized'] is True:
router.register_service('influxdata.iox.deployment.v1.DeploymentService')
if mode == 'database':
router.register_service('influxdata.iox.management.v1.ManagementService')
server_status_response = router.request(
'influxdata.iox.management.v1.ManagementService',
'GetServerStatus',
None,
)
if (
('server_status' in server_status_response)
and (server_status_response['server_status']['initialized'] is True)
):
break
else:
break
except:
# fall through to retry
@ -488,36 +505,40 @@ def exec_iox(id, service_name, debug=False, object_store='memory', print_only=Fa
def grpc_create_database(router_id, writer_id):
print('creating database "%s" on both IOx servers' % db_name)
router_db_rules = {
'rules': {
router_config = {
'router': {
'name': db_name,
'partition_template': {
'parts': [
{'time': '%Y-%m-%d %H:00:00'},
'write_sharder': {
'specific_targets': [
{
'matcher': {
'table_name_regex': '.*',
},
'shard': 1,
},
],
'hash_ring': None,
},
'lifecycle_rules': {
'immutable': True,
'worker_backoff_millis': '1000',
'catalog_transactions_until_checkpoint': '100',
'late_arrive_window_seconds': 300,
'persist_row_threshold': '1000000',
'persist_age_threshold_seconds': 1800,
'mub_row_threshold': '100000',
'max_active_compactions_cpu_fraction': 1.0,
},
'routing_config': {'sink': {'kafka': {}}},
'worker_cleanup_avg_sleep': '500s',
'write_buffer_connection': {
'direction': 'DIRECTION_WRITE',
'type': 'kafka',
'connection': '127.0.0.1:9093',
'connection_config': {},
'creation_config': {
'n_sequencers': 1,
'options': {},
'write_sinks': {
1: {
'sinks': [
{
'write_buffer': {
'direction': 'DIRECTION_WRITE',
'type': 'kafka',
'connection': '127.0.0.1:9093',
'connection_config': {},
'creation_config': {
'n_sequencers': 1,
'options': {},
},
},
'ignore_errors': False,
},
],
},
},
'query_sinks': None,
},
}
@ -560,7 +581,7 @@ def grpc_create_database(router_id, writer_id):
router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082)
router = grpc_requests.Client(router_grpc_addr, lazy=True)
router.register_service('influxdata.iox.management.v1.ManagementService')
router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules)
router.request('influxdata.iox.router.v1.RouterService', 'UpdateRouter', router_config)
router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080)
router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name)