mirror of https://github.com/milvus-io/milvus.git
Update vector engine
parent
6c4cae438a
commit
3b629dc6b4
|
@ -41,4 +41,20 @@ class MetaManager(object):
|
|||
for record in records:
|
||||
# print("record.group_name: ", record.group_name)
|
||||
db.session.delete(record)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def UpdateGroup(group_name, data):
|
||||
GroupTable.query.filter(GroupTable.group_name==group_name).update(data)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def GetAllRawFiles(group_name):
|
||||
FileTable.query.filter(FileTable.group_name == group_name and FileTable.type == 'raw')
|
||||
|
||||
@staticmethod
|
||||
def CreateRawFile(group_name, filename):
|
||||
db.session.add(FileTable(group_name, filename, 'raw', 0))
|
||||
|
||||
@staticmethod
|
||||
def UpdateFile(filename, data):
|
||||
FileTable.query.filter(FileTable.filename == filename).update(data)
|
||||
|
|
|
@ -49,6 +49,7 @@ class TestVectorEngine:
|
|||
# Add vector for exist group
|
||||
code, vector_id = VectorEngine.AddVector('test_group', self.__vectors)
|
||||
assert code == ErrorCode.SUCCESS_CODE
|
||||
print(vector_id)
|
||||
assert vector_id == ['test_group.0', 'test_group.1', 'test_group.2', 'test_group.3', 'test_group.4', 'test_group.5', 'test_group.6', 'test_group.7', 'test_group.8', 'test_group.9']
|
||||
|
||||
# Check search vector interface
|
||||
|
|
|
@ -5,13 +5,13 @@ from engine.controller.group_handler import GroupHandler
|
|||
from engine.controller.index_file_handler import IndexFileHandler
|
||||
from engine.settings import ROW_LIMIT
|
||||
from flask import jsonify
|
||||
from engine import db
|
||||
from engine.ingestion import build_index
|
||||
from engine.controller.scheduler import Scheduler
|
||||
from engine.ingestion import serialize
|
||||
from engine.controller.meta_manager import MetaManager
|
||||
from engine.controller.error_code import ErrorCode
|
||||
from engine.controller.storage_manager import StorageManager
|
||||
from datetime import date
|
||||
import sys, os
|
||||
|
||||
class VectorEngine(object):
|
||||
|
@ -63,66 +63,101 @@ class VectorEngine(object):
|
|||
|
||||
return VectorEngine.SUCCESS_CODE, group_list
|
||||
|
||||
@staticmethod
|
||||
def AddVectorToNewFile(group_name):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def AddVectorToExistFile(group_name):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def AddVector(group_name, vectors):
|
||||
print(group_name, vectors)
|
||||
error, _ = MetaManager.GetGroup(group_name)
|
||||
error, group = MetaManager.GetGroup(group_name)
|
||||
if error == VectorEngine.FAULT_CODE:
|
||||
return VectorEngine.GROUP_NOT_EXIST, 'invalid'
|
||||
|
||||
vector_str_list = []
|
||||
for vector in vectors:
|
||||
file = FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').first()
|
||||
group = GroupTable.query.filter(GroupTable.group_name == group_name).first()
|
||||
# first raw file
|
||||
raw_filename = str(group.file_number)
|
||||
files = MetaManager.GetAllRawFiles(group_name)
|
||||
|
||||
if file:
|
||||
print('insert into exist file')
|
||||
# create vector id
|
||||
vector_id = file.seq_no + 1
|
||||
# insert into raw file
|
||||
VectorEngine.InsertVectorIntoRawFile(group_name, file.filename, vector, vector_id)
|
||||
|
||||
# check if the file can be indexed
|
||||
if file.row_number + 1 >= ROW_LIMIT:
|
||||
raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_name)
|
||||
d = group.dimension
|
||||
|
||||
# create index
|
||||
index_builder = build_index.FactoryIndex()
|
||||
index = index_builder().build(d, raw_vector_array, raw_vector_id_array)
|
||||
|
||||
# TODO(jinhai): store index into Cache
|
||||
index_filename = file.filename + '_index'
|
||||
serialize.write_index(file_name=index_filename, index=index)
|
||||
|
||||
FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1,
|
||||
'type': 'index',
|
||||
'filename': index_filename,
|
||||
'seq_no': file.seq_no + 1})
|
||||
db.session.commit()
|
||||
VectorEngine.group_dict = None
|
||||
current_raw_row_number = 0
|
||||
current_raw_file = None
|
||||
if files != None:
|
||||
for file in files:
|
||||
if file.filename == raw_filename:
|
||||
current_raw_file = file
|
||||
current_raw_row_number = file.row_number
|
||||
print(raw_filename)
|
||||
else:
|
||||
# we still can insert into exist raw file, update database
|
||||
FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1,
|
||||
'seq_no': file.seq_no + 1})
|
||||
db.session.commit()
|
||||
print('Update db for raw file insertion')
|
||||
print("---- To Build Index")
|
||||
else:
|
||||
pass
|
||||
|
||||
else:
|
||||
print('add a new raw file')
|
||||
# first raw file
|
||||
raw_filename = group_name + '.raw'
|
||||
# create vector id
|
||||
vector_id = 0
|
||||
# create and insert vector into raw file
|
||||
VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vector, vector_id)
|
||||
# insert a record into database
|
||||
db.session.add(FileTable(group_name, raw_filename, 'raw', 1))
|
||||
db.session.commit()
|
||||
vector_str_list = []
|
||||
|
||||
# Verify if the row number + incoming row > limit
|
||||
incoming_row_number = len(vectors)
|
||||
|
||||
start_row_index = 0
|
||||
total_row_number = group.row_number
|
||||
table_row_number = current_raw_row_number
|
||||
if current_raw_row_number + incoming_row_number > ROW_LIMIT:
|
||||
# Insert into exist raw file
|
||||
start_row_index = ROW_LIMIT - current_raw_row_number
|
||||
|
||||
for i in range(0, start_row_index, 1):
|
||||
total_row_number += 1
|
||||
vector_id = total_row_number
|
||||
VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id)
|
||||
++ table_row_number
|
||||
vector_str_list.append(group_name + '.' + str(vector_id))
|
||||
|
||||
# Build index
|
||||
raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_name)
|
||||
d = group.dimension
|
||||
|
||||
# create index
|
||||
index_builder = build_index.FactoryIndex()
|
||||
index = index_builder().build(d, raw_vector_array, raw_vector_id_array)
|
||||
|
||||
# TODO(jinhai): store index into Cache
|
||||
index_filename = file.filename + '_index'
|
||||
serialize.write_index(file_name=index_filename, index=index)
|
||||
|
||||
UpdateFile(file.filename, {'row_number': ROW_LIMIT, 'type': 'index', 'filename': index_filename})
|
||||
|
||||
# create new raw file name
|
||||
raw_filename = str(group.file_number + 1)
|
||||
table_row_number = 0
|
||||
|
||||
# update file table
|
||||
MetaManager.CreateRawFile(group_name, raw_filename)
|
||||
|
||||
# Append vectors to raw file
|
||||
if current_raw_file == None:
|
||||
# update file table
|
||||
MetaManager.CreateRawFile(group_name, raw_filename)
|
||||
|
||||
# 1. update db
|
||||
new_group_file_number = group.file_number + 1
|
||||
new_group_row_number = int(group.row_number) + incoming_row_number
|
||||
MetaManager.UpdateGroup(group_name, {'file_number': new_group_file_number, 'row_number': new_group_row_number})
|
||||
|
||||
# 2. store vector into raw files
|
||||
for i in range (start_row_index, incoming_row_number, 1):
|
||||
vector_id = total_row_number
|
||||
total_row_number += 1
|
||||
VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id)
|
||||
++ table_row_number
|
||||
vector_str_list.append(group_name + '.' + str(vector_id))
|
||||
|
||||
MetaManager.UpdateFile(raw_filename, {'row_number': table_row_number})
|
||||
|
||||
MetaManager.UpdateGroup(group_name, {'row_number': total_row_number})
|
||||
# 3. sync
|
||||
MetaManager.Sync()
|
||||
return VectorEngine.SUCCESS_CODE, vector_str_list
|
||||
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ class FileTable(db.Model):
|
|||
filename = db.Column(db.String(100))
|
||||
type = db.Column(db.String(100))
|
||||
row_number = db.Column(db.Integer)
|
||||
seq_no = db.Column(db.Integer)
|
||||
date = db.Column(db.Date)
|
||||
|
||||
|
||||
def __init__(self, group_name, filename, type, row_number):
|
||||
|
@ -16,7 +16,6 @@ class FileTable(db.Model):
|
|||
self.type = type
|
||||
self.row_number = row_number
|
||||
self.type = type
|
||||
self.seq_no = 0
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
|
|
|
@ -5,6 +5,7 @@ class GroupTable(db.Model):
|
|||
id = db.Column(db.Integer, primary_key=True)
|
||||
group_name = db.Column(db.String(100))
|
||||
file_number = db.Column(db.Integer)
|
||||
row_number = db.Column(db.BigInteger)
|
||||
dimension = db.Column(db.Integer)
|
||||
|
||||
|
||||
|
@ -12,6 +13,7 @@ class GroupTable(db.Model):
|
|||
self.group_name = group_name
|
||||
self.dimension = dimension
|
||||
self.file_number = 0
|
||||
self.row_number = 0
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
|
|
Loading…
Reference in New Issue