milvus/pyengine/engine/controller/vector_engine.py

245 lines
9.2 KiB
Python
Raw Normal View History

2019-03-22 12:55:36 +00:00
from engine.model.group_table import GroupTable
from engine.model.file_table import FileTable
from engine.controller.raw_file_handler import RawFileHandler
from engine.controller.group_handler import GroupHandler
from engine.controller.index_file_handler import IndexFileHandler
from engine.settings import ROW_LIMIT
from flask import jsonify
2019-03-24 09:07:38 +00:00
from engine.ingestion import build_index
from engine.controller.scheduler import Scheduler
from engine.ingestion import serialize
from engine.controller.meta_manager import MetaManager
from engine.controller.error_code import ErrorCode
from engine.controller.storage_manager import StorageManager
from datetime import date
2019-03-22 12:55:36 +00:00
import sys, os
class VectorEngine(object):
2019-03-26 13:28:20 +00:00
group_vector_dict = None
group_vector_id_dict = None
2019-03-24 11:12:48 +00:00
SUCCESS_CODE = 0
FAULT_CODE = 1
GROUP_NOT_EXIST = 2
2019-03-22 12:55:36 +00:00
@staticmethod
def AddGroup(group_name, dimension):
error, group = MetaManager.GetGroup(group_name)
if error == ErrorCode.SUCCESS_CODE:
return ErrorCode.FAULT_CODE, group_name
2019-03-22 12:55:36 +00:00
else:
StorageManager.AddGroup(group_name)
MetaManager.AddGroup(group_name, dimension)
MetaManager.Sync()
return ErrorCode.SUCCESS_CODE, group_name
2019-03-22 12:55:36 +00:00
@staticmethod
def GetGroup(group_name):
error, _ = MetaManager.GetGroup(group_name)
return error, group_name
2019-03-22 12:55:36 +00:00
@staticmethod
def DeleteGroup(group_name):
group = GroupTable.query.filter(GroupTable.group_name==group_name).first()
2019-03-22 12:55:36 +00:00
if(group):
MetaManager.DeleteGroup(group)
StorageManager.DeleteGroup(group_name)
MetaManager.DeleteGroupFiles(group_name)
MetaManager.Sync()
return VectorEngine.SUCCESS_CODE, group_name
2019-03-22 12:55:36 +00:00
else:
return VectorEngine.SUCCESS_CODE, group_name
2019-03-22 12:55:36 +00:00
@staticmethod
def GetGroupList():
groups = MetaManager.GetAllGroup()
2019-03-22 12:55:36 +00:00
group_list = []
for group_tuple in groups:
2019-03-22 12:55:36 +00:00
group_item = {}
group_item['group_name'] = group_tuple.group_name
group_item['file_number'] = 0
2019-03-22 12:55:36 +00:00
group_list.append(group_item)
2019-03-24 11:12:48 +00:00
return VectorEngine.SUCCESS_CODE, group_list
2019-03-22 12:55:36 +00:00
@staticmethod
def AddVectorToNewFile(group_name):
pass
@staticmethod
def AddVectorToExistFile(group_name):
pass
2019-03-22 12:55:36 +00:00
@staticmethod
def AddVector(group_name, vectors):
print(group_name, vectors)
error, group = MetaManager.GetGroup(group_name)
if error == VectorEngine.FAULT_CODE:
2019-03-26 13:28:20 +00:00
return VectorEngine.GROUP_NOT_EXIST, 'invalid'
2019-03-24 11:12:48 +00:00
# first raw file
raw_filename = str(group.file_number)
files = MetaManager.GetAllRawFiles(group_name)
current_raw_row_number = 0
current_raw_file = None
if files != None:
for file in files:
if file.filename == raw_filename:
current_raw_file = file
current_raw_row_number = file.row_number
print(raw_filename)
2019-04-02 11:33:22 +00:00
else:
print("---- To Build Index")
else:
pass
2019-03-22 12:55:36 +00:00
vector_str_list = []
# Verify if the row number + incoming row > limit
incoming_row_number = len(vectors)
start_row_index = 0
total_row_number = group.row_number
table_row_number = current_raw_row_number
if current_raw_row_number + incoming_row_number > ROW_LIMIT:
# Insert into exist raw file
start_row_index = ROW_LIMIT - current_raw_row_number
for i in range(0, start_row_index, 1):
total_row_number += 1
vector_id = total_row_number
VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id)
++ table_row_number
vector_str_list.append(group_name + '.' + str(vector_id))
# Build index
raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_name)
d = group.dimension
# create index
index_builder = build_index.FactoryIndex()
index = index_builder().build(d, raw_vector_array, raw_vector_id_array)
# TODO(jinhai): store index into Cache
index_filename = file.filename + '_index'
serialize.write_index(file_name=index_filename, index=index)
UpdateFile(file.filename, {'row_number': ROW_LIMIT, 'type': 'index', 'filename': index_filename})
# create new raw file name
raw_filename = str(group.file_number + 1)
table_row_number = 0
# update file table
MetaManager.CreateRawFile(group_name, raw_filename)
# Append vectors to raw file
if current_raw_file == None:
# update file table
MetaManager.CreateRawFile(group_name, raw_filename)
# 1. update db
new_group_file_number = group.file_number + 1
new_group_row_number = int(group.row_number) + incoming_row_number
MetaManager.UpdateGroup(group_name, {'file_number': new_group_file_number, 'row_number': new_group_row_number})
# 2. store vector into raw files
for i in range (start_row_index, incoming_row_number, 1):
vector_id = total_row_number
total_row_number += 1
VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id)
++ table_row_number
vector_str_list.append(group_name + '.' + str(vector_id))
2019-03-22 12:55:36 +00:00
MetaManager.UpdateFile(raw_filename, {'row_number': table_row_number})
MetaManager.UpdateGroup(group_name, {'row_number': total_row_number})
# 3. sync
MetaManager.Sync()
2019-04-02 11:33:22 +00:00
return VectorEngine.SUCCESS_CODE, vector_str_list
2019-03-22 12:55:36 +00:00
@staticmethod
def SearchVector(group_id, vector, limit):
2019-03-24 11:12:48 +00:00
# Check the group exist
code, _ = VectorEngine.GetGroup(group_id)
2019-03-24 11:12:48 +00:00
if code == VectorEngine.FAULT_CODE:
2019-03-26 13:28:20 +00:00
return VectorEngine.GROUP_NOT_EXIST, {}
2019-03-24 11:12:48 +00:00
group = GroupTable.query.filter(GroupTable.group_name == group_id).first()
2019-03-22 12:55:36 +00:00
# find all files
files = FileTable.query.filter(FileTable.group_name == group_id).all()
2019-03-24 09:07:38 +00:00
index_keys = [ i.filename for i in files if i.type == 'index' ]
2019-03-24 11:43:44 +00:00
index_map = {}
index_map['index'] = index_keys
2019-03-26 13:28:20 +00:00
index_map['raw'], index_map['raw_id'] = VectorEngine.GetVectorListFromRawFile(group_id, "fakename") #TODO: pass by key, get from storage
index_map['dimension'] = group.dimension
2019-03-22 12:55:36 +00:00
2019-03-24 11:43:44 +00:00
scheduler_instance = Scheduler()
2019-03-25 11:20:35 +00:00
vectors = []
vectors.append(vector)
2019-03-26 06:52:38 +00:00
result = scheduler_instance.search(index_map, vectors, limit)
2019-03-22 12:55:36 +00:00
2019-03-27 06:07:32 +00:00
vector_ids_str = []
2019-03-27 11:37:08 +00:00
for int_id in result:
2019-03-27 06:07:32 +00:00
vector_ids_str.append(group_id + '.' + str(int_id))
2019-03-24 11:12:48 +00:00
2019-03-27 06:07:32 +00:00
return VectorEngine.SUCCESS_CODE, vector_ids_str
2019-03-22 12:55:36 +00:00
@staticmethod
def CreateIndex(group_id):
2019-03-24 11:12:48 +00:00
# Check the group exist
code, _ = VectorEngine.GetGroup(group_id)
2019-03-24 11:12:48 +00:00
if code == VectorEngine.FAULT_CODE:
return VectorEngine.GROUP_NOT_EXIST
2019-03-22 12:55:36 +00:00
# create index
file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first()
path = GroupHandler.GetGroupDirectory(group_id) + '/' + file.filename
print('Going to create index for: ', path)
2019-03-24 11:12:48 +00:00
return VectorEngine.SUCCESS_CODE
2019-03-22 12:55:36 +00:00
@staticmethod
2019-03-26 13:28:20 +00:00
def InsertVectorIntoRawFile(group_id, filename, vector, vector_id):
2019-03-22 12:55:36 +00:00
# print(sys._getframe().f_code.co_name, group_id, vector)
# path = GroupHandler.GetGroupDirectory(group_id) + '/' + filename
2019-03-26 13:28:20 +00:00
if VectorEngine.group_vector_dict is None:
# print("VectorEngine.group_vector_dict is None")
VectorEngine.group_vector_dict = dict()
if VectorEngine.group_vector_id_dict is None:
VectorEngine.group_vector_id_dict = dict()
if not (group_id in VectorEngine.group_vector_dict):
VectorEngine.group_vector_dict[group_id] = []
2019-03-24 11:12:48 +00:00
2019-03-26 13:28:20 +00:00
if not (group_id in VectorEngine.group_vector_id_dict):
VectorEngine.group_vector_id_dict[group_id] = []
2019-03-24 11:12:48 +00:00
2019-03-26 13:28:20 +00:00
VectorEngine.group_vector_dict[group_id].append(vector)
VectorEngine.group_vector_id_dict[group_id].append(vector_id)
2019-03-22 12:55:36 +00:00
2019-03-27 11:37:08 +00:00
# print('InsertVectorIntoRawFile: ', VectorEngine.group_vector_dict[group_id], VectorEngine.group_vector_id_dict[group_id])
2019-03-27 06:18:13 +00:00
print("cache size: ", len(VectorEngine.group_vector_dict[group_id]))
2019-03-22 12:55:36 +00:00
return filename
@staticmethod
2019-03-24 09:07:38 +00:00
def GetVectorListFromRawFile(group_id, filename="todo"):
2019-03-27 11:37:08 +00:00
# print("GetVectorListFromRawFile, vectors: ", serialize.to_array(VectorEngine.group_vector_dict[group_id]))
# print("GetVectorListFromRawFile, vector_ids: ", serialize.to_int_array(VectorEngine.group_vector_id_dict[group_id]))
2019-03-27 06:07:32 +00:00
return serialize.to_array(VectorEngine.group_vector_dict[group_id]), serialize.to_int_array(VectorEngine.group_vector_id_dict[group_id])
2019-03-22 12:55:36 +00:00
2019-03-24 11:12:48 +00:00
@staticmethod
def ClearRawFile(group_id):
2019-03-26 13:28:20 +00:00
print("VectorEngine.group_vector_dict: ", VectorEngine.group_vector_dict)
del VectorEngine.group_vector_dict[group_id]
del VectorEngine.group_vector_id_dict[group_id]
2019-03-24 11:12:48 +00:00
return VectorEngine.SUCCESS_CODE