[skip ci](shards): fix #1679 (#1681)

Signed-off-by: peng.xu <peng.xu@zilliz.com>
pull/1684/head
XuPeng-SH 2020-03-17 11:06:51 +08:00 committed by GitHub
parent 8644d341c4
commit b520e60b8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 16 deletions

View File

@ -48,7 +48,7 @@ class Factory(RouterMixin):
db.remove_session()
servers = self.readonly_topo.group_names
logger.info('Available servers: {}'.format(servers))
logger.info('Available servers: {}'.format(list(servers)))
ring = HashRing(servers)
@ -59,8 +59,13 @@ class Factory(RouterMixin):
target_host = ring.get_node(str(f.id))
sub = routing.get(target_host, None)
if not sub:
routing[target_host] = {'table_id': f.table_id, 'file_ids': []}
routing[target_host]['file_ids'].append(str(f.id))
sub = {}
routing[target_host] = sub
kv = sub.get(f.table_id, None)
if not kv:
kv = []
sub[f.table_id] = kv
sub[f.table_id].append(str(f.id))
return routing

View File

@ -130,10 +130,10 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
rs = []
all_topk_results = []
def search(addr, query_params, vectors, topk, nprobe, **kwargs):
def search(addr, table_id, file_ids, vectors, topk, nprobe, **kwargs):
logger.info(
'Send Search Request: addr={};params={};nq={};topk={};nprobe={}'
.format(addr, query_params, len(vectors), topk, nprobe))
'Send Search Request: addr={};table_id={};ids={};nq={};topk={};nprobe={}'
.format(addr, table_id, file_ids, len(vectors), topk, nprobe))
conn = self.router.query_conn(addr, metadata=metadata)
start = time.time()
@ -143,8 +143,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
with self.tracer.start_span('search_{}'.format(addr),
child_of=span):
ret = conn.conn.search_vectors_in_files(table_name=query_params['table_id'],
file_ids=query_params['file_ids'],
ret = conn.conn.search_vectors_in_files(table_name=table_id,
file_ids=file_ids,
query_records=vectors,
top_k=topk,
nprobe=nprobe)
@ -155,14 +155,16 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
with self.tracer.start_span('do_search', child_of=p_span) as span:
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
for addr, params in routing.items():
res = pool.submit(search,
addr,
params,
vectors,
topk,
nprobe,
span=span)
rs.append(res)
for table_id, file_ids in params.items():
res = pool.submit(search,
addr,
table_id,
file_ids,
vectors,
topk,
nprobe,
span=span)
rs.append(res)
for res in rs:
res.result()