From b520e60b8ae060132ea33d47391285deac604a94 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 17 Mar 2020 11:06:51 +0800 Subject: [PATCH] [skip ci](shards): fix #1679 (#1681) Signed-off-by: peng.xu --- .../plugins/file_based_hash_ring_router.py | 11 ++++++-- shards/mishards/service_handler.py | 28 ++++++++++--------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/shards/mishards/router/plugins/file_based_hash_ring_router.py b/shards/mishards/router/plugins/file_based_hash_ring_router.py index 299335baa7..6d396ba2a4 100644 --- a/shards/mishards/router/plugins/file_based_hash_ring_router.py +++ b/shards/mishards/router/plugins/file_based_hash_ring_router.py @@ -48,7 +48,7 @@ class Factory(RouterMixin): db.remove_session() servers = self.readonly_topo.group_names - logger.info('Available servers: {}'.format(servers)) + logger.info('Available servers: {}'.format(list(servers))) ring = HashRing(servers) @@ -59,8 +59,13 @@ class Factory(RouterMixin): target_host = ring.get_node(str(f.id)) sub = routing.get(target_host, None) if not sub: - routing[target_host] = {'table_id': f.table_id, 'file_ids': []} - routing[target_host]['file_ids'].append(str(f.id)) + sub = {} + routing[target_host] = sub + kv = sub.get(f.table_id, None) + if not kv: + kv = [] + sub[f.table_id] = kv + sub[f.table_id].append(str(f.id)) return routing diff --git a/shards/mishards/service_handler.py b/shards/mishards/service_handler.py index 5612af5333..332c24e717 100644 --- a/shards/mishards/service_handler.py +++ b/shards/mishards/service_handler.py @@ -130,10 +130,10 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): rs = [] all_topk_results = [] - def search(addr, query_params, vectors, topk, nprobe, **kwargs): + def search(addr, table_id, file_ids, vectors, topk, nprobe, **kwargs): logger.info( - 'Send Search Request: addr={};params={};nq={};topk={};nprobe={}' - .format(addr, query_params, len(vectors), topk, nprobe)) + 'Send Search Request: addr={};table_id={};ids={};nq={};topk={};nprobe={}' + .format(addr, table_id, file_ids, len(vectors), topk, nprobe)) conn = self.router.query_conn(addr, metadata=metadata) start = time.time() @@ -143,8 +143,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('search_{}'.format(addr), child_of=span): - ret = conn.conn.search_vectors_in_files(table_name=query_params['table_id'], - file_ids=query_params['file_ids'], + ret = conn.conn.search_vectors_in_files(table_name=table_id, + file_ids=file_ids, query_records=vectors, top_k=topk, nprobe=nprobe) @@ -155,14 +155,16 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('do_search', child_of=p_span) as span: with ThreadPoolExecutor(max_workers=self.max_workers) as pool: for addr, params in routing.items(): - res = pool.submit(search, - addr, - params, - vectors, - topk, - nprobe, - span=span) - rs.append(res) + for table_id, file_ids in params.items(): + res = pool.submit(search, + addr, + table_id, + file_ids, + vectors, + topk, + nprobe, + span=span) + rs.append(res) for res in rs: res.result()