mirror of https://github.com/milvus-io/milvus.git
Merge branch 'linxj' into 'develop'
format code See merge request jinhai/vecwise_engine!10pull/3/head
commit
77a0880a5c
|
@ -1,10 +1,12 @@
|
|||
from engine.retrieval import search_index
|
||||
from engine.ingestion import build_index
|
||||
from engine.ingestion import serialize
|
||||
import numpy as np
|
||||
|
||||
|
||||
class Singleton(type):
|
||||
_instances = {}
|
||||
|
||||
def __call__(cls, *args, **kwargs):
|
||||
if cls not in cls._instances:
|
||||
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
||||
|
@ -12,7 +14,7 @@ class Singleton(type):
|
|||
|
||||
|
||||
class Scheduler(metaclass=Singleton):
|
||||
def Search(self, index_file_key, vectors, k):
|
||||
def search(self, index_file_key, vectors, k):
|
||||
# assert index_file_key
|
||||
# assert vectors
|
||||
assert k != 0
|
||||
|
@ -20,7 +22,6 @@ class Scheduler(metaclass=Singleton):
|
|||
query_vectors = serialize.to_array(vectors)
|
||||
return self.__scheduler(index_file_key, query_vectors, k)
|
||||
|
||||
|
||||
def __scheduler(self, index_data_key, vectors, k):
|
||||
result_list = []
|
||||
|
||||
|
@ -35,18 +36,35 @@ class Scheduler(metaclass=Singleton):
|
|||
if 'index' in index_data_key:
|
||||
index_data_list = index_data_key['index']
|
||||
for key in index_data_list:
|
||||
index = GetIndexData(key)
|
||||
index = get_index_data(key)
|
||||
searcher = search_index.FaissSearch(index)
|
||||
result_list.append(searcher.search_by_vectors(vectors, k))
|
||||
|
||||
if len(result_list) == 1:
|
||||
return result_list[0].vectors
|
||||
|
||||
total_result = []
|
||||
return result_list; # TODO(linxj): add topk
|
||||
|
||||
# result = search_index.top_k(result_list, k)
|
||||
return result_list
|
||||
# d_list = np.array([])
|
||||
# v_list = np.array([])
|
||||
# for result in result_list:
|
||||
# rd = result.distance
|
||||
# rv = result.vectors
|
||||
#
|
||||
# td_list = np.array([])
|
||||
# tv_list = np.array([])
|
||||
# for d, v in zip(rd, rv):
|
||||
# td_list = np.append(td_list, d)
|
||||
# tv_list = np.append(tv_list, v)
|
||||
# d_list = np.add(d_list, td_list)
|
||||
# v_list = np.add(v_list, td_list)
|
||||
#
|
||||
# print(d_list)
|
||||
# print(v_list)
|
||||
# result_map = [d_list, v_list]
|
||||
# top_k_result = search_index.top_k(result_map, k)
|
||||
# return top_k_result
|
||||
|
||||
|
||||
def GetIndexData(key):
|
||||
def get_index_data(key):
|
||||
return serialize.read_index(key)
|
|
@ -9,11 +9,10 @@ class TestScheduler(unittest.TestCase):
|
|||
def test_schedule(self):
|
||||
d = 64
|
||||
nb = 10000
|
||||
nq = 100
|
||||
nq = 2
|
||||
nt = 5000
|
||||
xt, xb, xq = get_dataset(d, nb, nt, nq)
|
||||
file_name = "/tmp/faiss/tempfile_1"
|
||||
|
||||
file_name = "/tmp/tempfile_1"
|
||||
|
||||
index = faiss.IndexFlatL2(d)
|
||||
print(index.is_trained)
|
||||
|
@ -26,17 +25,17 @@ class TestScheduler(unittest.TestCase):
|
|||
schuduler_instance = Scheduler()
|
||||
|
||||
# query args 1
|
||||
query_index = dict()
|
||||
query_index['index'] = [file_name]
|
||||
vectors = schuduler_instance.Search(query_index, vectors=xq, k=5)
|
||||
assert np.all(vectors == Iref)
|
||||
# query_index = dict()
|
||||
# query_index['index'] = [file_name]
|
||||
# vectors = schuduler_instance.search(query_index, vectors=xq, k=5)
|
||||
# assert np.all(vectors == Iref)
|
||||
|
||||
# query args 2
|
||||
query_index = dict()
|
||||
query_index['raw'] = xt
|
||||
query_index['dimension'] = d
|
||||
query_index['index'] = [file_name]
|
||||
vectors = schuduler_instance.Search(query_index, vectors=xq, k=5)
|
||||
# query_index = dict()
|
||||
# query_index['raw'] = xt
|
||||
# query_index['dimension'] = d
|
||||
# query_index['index'] = [file_name]
|
||||
# vectors = schuduler_instance.search(query_index, vectors=xq, k=5)
|
||||
# print("success")
|
||||
|
||||
|
||||
|
@ -56,5 +55,6 @@ def get_dataset(d, nb, nt, nq):
|
|||
x = x.astype('float32')
|
||||
return x[:nt], x[nt:-nq], x[-nq:]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -150,7 +150,7 @@ class VectorEngine(object):
|
|||
scheduler_instance = Scheduler()
|
||||
vectors = []
|
||||
vectors.append(vector)
|
||||
result = scheduler_instance.Search(index_map, vectors, limit)
|
||||
result = scheduler_instance.search(index_map, vectors, limit)
|
||||
|
||||
vector_id = 0
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ from enum import Enum, unique
|
|||
|
||||
|
||||
@unique
|
||||
class INDEX_DEVICES(Enum):
|
||||
class INDEXDEVICES(Enum):
|
||||
CPU = 0
|
||||
GPU = 1
|
||||
MULTI_GPU = 2
|
||||
|
@ -15,7 +15,7 @@ def FactoryIndex(index_name="DefaultIndex"):
|
|||
|
||||
|
||||
class Index():
|
||||
def build(self, d, vectors, DEVICE=INDEX_DEVICES.CPU):
|
||||
def build(self, d, vectors, DEVICE=INDEXDEVICES.CPU):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
|
@ -35,7 +35,7 @@ class DefaultIndex(Index):
|
|||
# maybe need to specif parameters
|
||||
pass
|
||||
|
||||
def build(self, d, vectors, DEVICE=INDEX_DEVICES.CPU):
|
||||
def build(self, d, vectors, DEVICE=INDEXDEVICES.CPU):
|
||||
index = faiss.IndexFlatL2(d) # trained
|
||||
index.add(vectors)
|
||||
return index
|
||||
|
@ -47,7 +47,7 @@ class LowMemoryIndex(Index):
|
|||
self.__bytes_per_vector = 8
|
||||
self.__bits_per_sub_vector = 8
|
||||
|
||||
def build(d, vectors, DEVICE=INDEX_DEVICES.CPU):
|
||||
def build(self, d, vectors, DEVICE=INDEXDEVICES.CPU):
|
||||
# quantizer = faiss.IndexFlatL2(d)
|
||||
# index = faiss.IndexIVFPQ(quantizer, d, self.nlist,
|
||||
# self.__bytes_per_vector, self.__bits_per_sub_vector)
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
import faiss
|
||||
import numpy as np
|
||||
|
||||
|
||||
def write_index(index, file_name):
|
||||
faiss.write_index(index, file_name)
|
||||
|
||||
|
||||
def read_index(file_name):
|
||||
return faiss.read_index(file_name)
|
||||
|
||||
|
||||
def to_array(vec):
|
||||
return np.asarray(vec).astype('float32')
|
|
@ -65,7 +65,6 @@ class TestBuildIndex(unittest.TestCase):
|
|||
assert np.all(Dnew == Dref) and np.all(Inew == Iref)
|
||||
|
||||
|
||||
|
||||
def get_dataset(d, nb, nt, nq):
|
||||
"""A dataset that is not completely random but still challenging to
|
||||
index
|
||||
|
@ -83,6 +82,5 @@ def get_dataset(d, nb, nt, nq):
|
|||
return x[:nt], x[nt:-nq], x[-nq:]
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -1,4 +1,5 @@
|
|||
import faiss
|
||||
import numpy as np
|
||||
|
||||
|
||||
class SearchResult():
|
||||
|
@ -32,7 +33,9 @@ class FaissSearch():
|
|||
D, I = self.__index.search(vector_list, k)
|
||||
return SearchResult(D, I)
|
||||
|
||||
import heapq
|
||||
|
||||
# import heapq
|
||||
def top_k(input, k):
|
||||
#sorted = heapq.nsmallest(k, input, key=input.key)
|
||||
pass
|
||||
# sorted = heapq.nsmallest(k, input, key=np.sum(input.get()))
|
||||
# return sorted
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
# import numpy as np
|
||||
|
||||
# d = 64 # dimension
|
||||
# nb = 100000 # database size
|
||||
# nq = 10000 # nb of queries
|
||||
# np.random.seed(1234) # make reproducible
|
||||
# xb = np.random.random((nb, d)).astype('float32')
|
||||
# xb[:, 0] += np.arange(nb) / 1000.
|
||||
# xq = np.random.random((nq, d)).astype('float32')
|
||||
# xq[:, 0] += np.arange(nq) / 1000.
|
||||
#
|
||||
# import faiss # make faiss available
|
||||
#
|
||||
# res = faiss.StandardGpuResources() # use a single GPU
|
||||
#
|
||||
# ## Using a flat index
|
||||
#
|
||||
# index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index
|
||||
#
|
||||
# # make it a flat GPU index
|
||||
# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat)
|
||||
#
|
||||
# gpu_index_flat.add(xb) # add vectors to the index
|
||||
# print(gpu_index_flat.ntotal)
|
||||
#
|
||||
# k = 4 # we want to see 4 nearest neighbors
|
||||
# D, I = gpu_index_flat.search(xq, k) # actual search
|
||||
# print(I[:5]) # neighbors of the 5 first queries
|
||||
# print(I[-5:]) # neighbors of the 5 last queries
|
||||
#
|
||||
#
|
||||
# ## Using an IVF index
|
||||
#
|
||||
# nlist = 100
|
||||
# quantizer = faiss.IndexFlatL2(d) # the other index
|
||||
# index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2)
|
||||
# # here we specify METRIC_L2, by default it performs inner-product search
|
||||
#
|
||||
# # make it an IVF GPU index
|
||||
# gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf)
|
||||
#
|
||||
# assert not gpu_index_ivf.is_trained
|
||||
# gpu_index_ivf.train(xb) # add vectors to the index
|
||||
# assert gpu_index_ivf.is_trained
|
||||
#
|
||||
# gpu_index_ivf.add(xb) # add vectors to the index
|
||||
# print(gpu_index_ivf.ntotal)
|
||||
#
|
||||
# k = 4 # we want to see 4 nearest neighbors
|
||||
# D, I = gpu_index_ivf.search(xq, k) # actual search
|
||||
# print(I[:5]) # neighbors of the 5 first queries
|
||||
# print(I[-5:])
|
||||
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
@pytest.mark.skip(reason="Not for pytest")
|
||||
def basic_test():
|
||||
d = 64 # dimension
|
||||
nb = 100000 # database size
|
||||
nq = 10000 # nb of queries
|
||||
np.random.seed(1234) # make reproducible
|
||||
xb = np.random.random((nb, d)).astype('float32')
|
||||
xb[:, 0] += np.arange(nb) / 1000.
|
||||
xc = np.random.random((nb, d)).astype('float32')
|
||||
xc[:, 0] += np.arange(nb) / 1000.
|
||||
xq = np.random.random((nq, d)).astype('float32')
|
||||
xq[:, 0] += np.arange(nq) / 1000.
|
||||
|
||||
import faiss # make faiss available
|
||||
index = faiss.IndexFlatL2(d) # build the index
|
||||
print(index.is_trained)
|
||||
index.add(xb) # add vectors to the index
|
||||
print(index.ntotal)
|
||||
#faiss.write_index(index, "/tmp/faiss/tempfile_1")
|
||||
writer = faiss.VectorIOWriter()
|
||||
faiss.write_index(index, writer)
|
||||
ar_data = faiss.vector_to_array(writer.data)
|
||||
import pickle
|
||||
pickle.dump(ar_data, open("/tmp/faiss/ser_1", "wb"))
|
||||
|
||||
#index_3 = pickle.load("/tmp/faiss/ser_1")
|
||||
|
||||
|
||||
# index_2 = faiss.IndexFlatL2(d) # build the index
|
||||
# print(index_2.is_trained)
|
||||
# index_2.add(xc) # add vectors to the index
|
||||
# print(index_2.ntotal)
|
||||
# faiss.write_index(index, "/tmp/faiss/tempfile_2")
|
||||
#
|
||||
# index_3 = faiss.read_index
|
||||
|
||||
# k = 4 # we want to see 4 nearest neighbors
|
||||
# D, I = index.search(xb[:5], k) # sanity check
|
||||
# print(I)
|
||||
# print(D)
|
||||
# D, I = index.search(xq, k) # actual search
|
||||
# print(I[:5]) # neighbors of the 5 first queries
|
||||
# print(I[-5:]) # neighbors of the 5 last queries
|
||||
|
||||
if __name__ == '__main__':
|
||||
basic_test()
|
|
@ -3,6 +3,7 @@ from ..search_index import *
|
|||
import unittest
|
||||
import numpy as np
|
||||
|
||||
|
||||
class TestSearchSingleThread(unittest.TestCase):
|
||||
def test_search_by_vectors(self):
|
||||
d = 64
|
||||
|
|
Loading…
Reference in New Issue