diff --git a/pyengine/engine/controller/meta_manager.py b/pyengine/engine/controller/meta_manager.py index e34ca53d84..dcfabc78a8 100644 --- a/pyengine/engine/controller/meta_manager.py +++ b/pyengine/engine/controller/meta_manager.py @@ -41,4 +41,20 @@ class MetaManager(object): for record in records: # print("record.group_name: ", record.group_name) db.session.delete(record) - \ No newline at end of file + + @staticmethod + def UpdateGroup(group_name, data): + GroupTable.query.filter(GroupTable.group_name==group_name).update(data) + + + @staticmethod + def GetAllRawFiles(group_name): + FileTable.query.filter(FileTable.group_name == group_name and FileTable.type == 'raw') + + @staticmethod + def CreateRawFile(group_name, filename): + db.session.add(FileTable(group_name, filename, 'raw', 0)) + + @staticmethod + def UpdateFile(filename, data): + FileTable.query.filter(FileTable.filename == filename).update(data) diff --git a/pyengine/engine/controller/tests/test_vector_engine.py b/pyengine/engine/controller/tests/test_vector_engine.py index aceb3ecfbb..eb5193cd05 100644 --- a/pyengine/engine/controller/tests/test_vector_engine.py +++ b/pyengine/engine/controller/tests/test_vector_engine.py @@ -49,6 +49,7 @@ class TestVectorEngine: # Add vector for exist group code, vector_id = VectorEngine.AddVector('test_group', self.__vectors) assert code == ErrorCode.SUCCESS_CODE + print(vector_id) assert vector_id == ['test_group.0', 'test_group.1', 'test_group.2', 'test_group.3', 'test_group.4', 'test_group.5', 'test_group.6', 'test_group.7', 'test_group.8', 'test_group.9'] # Check search vector interface diff --git a/pyengine/engine/controller/vector_engine.py b/pyengine/engine/controller/vector_engine.py index 454c363697..e8805a5eee 100644 --- a/pyengine/engine/controller/vector_engine.py +++ b/pyengine/engine/controller/vector_engine.py @@ -5,13 +5,13 @@ from engine.controller.group_handler import GroupHandler from engine.controller.index_file_handler import IndexFileHandler from engine.settings import ROW_LIMIT from flask import jsonify -from engine import db from engine.ingestion import build_index from engine.controller.scheduler import Scheduler from engine.ingestion import serialize from engine.controller.meta_manager import MetaManager from engine.controller.error_code import ErrorCode from engine.controller.storage_manager import StorageManager +from datetime import date import sys, os class VectorEngine(object): @@ -63,66 +63,101 @@ class VectorEngine(object): return VectorEngine.SUCCESS_CODE, group_list + @staticmethod + def AddVectorToNewFile(group_name): + pass + + @staticmethod + def AddVectorToExistFile(group_name): + pass @staticmethod def AddVector(group_name, vectors): print(group_name, vectors) - error, _ = MetaManager.GetGroup(group_name) + error, group = MetaManager.GetGroup(group_name) if error == VectorEngine.FAULT_CODE: return VectorEngine.GROUP_NOT_EXIST, 'invalid' - vector_str_list = [] - for vector in vectors: - file = FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').first() - group = GroupTable.query.filter(GroupTable.group_name == group_name).first() + # first raw file + raw_filename = str(group.file_number) + files = MetaManager.GetAllRawFiles(group_name) - if file: - print('insert into exist file') - # create vector id - vector_id = file.seq_no + 1 - # insert into raw file - VectorEngine.InsertVectorIntoRawFile(group_name, file.filename, vector, vector_id) - - # check if the file can be indexed - if file.row_number + 1 >= ROW_LIMIT: - raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_name) - d = group.dimension - - # create index - index_builder = build_index.FactoryIndex() - index = index_builder().build(d, raw_vector_array, raw_vector_id_array) - - # TODO(jinhai): store index into Cache - index_filename = file.filename + '_index' - serialize.write_index(file_name=index_filename, index=index) - - FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, - 'type': 'index', - 'filename': index_filename, - 'seq_no': file.seq_no + 1}) - db.session.commit() - VectorEngine.group_dict = None + current_raw_row_number = 0 + current_raw_file = None + if files != None: + for file in files: + if file.filename == raw_filename: + current_raw_file = file + current_raw_row_number = file.row_number + print(raw_filename) else: - # we still can insert into exist raw file, update database - FileTable.query.filter(FileTable.group_name == group_name).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, - 'seq_no': file.seq_no + 1}) - db.session.commit() - print('Update db for raw file insertion') + print("---- To Build Index") + else: + pass - else: - print('add a new raw file') - # first raw file - raw_filename = group_name + '.raw' - # create vector id - vector_id = 0 - # create and insert vector into raw file - VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vector, vector_id) - # insert a record into database - db.session.add(FileTable(group_name, raw_filename, 'raw', 1)) - db.session.commit() + vector_str_list = [] + # Verify if the row number + incoming row > limit + incoming_row_number = len(vectors) + + start_row_index = 0 + total_row_number = group.row_number + table_row_number = current_raw_row_number + if current_raw_row_number + incoming_row_number > ROW_LIMIT: + # Insert into exist raw file + start_row_index = ROW_LIMIT - current_raw_row_number + + for i in range(0, start_row_index, 1): + total_row_number += 1 + vector_id = total_row_number + VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id) + ++ table_row_number + vector_str_list.append(group_name + '.' + str(vector_id)) + + # Build index + raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_name) + d = group.dimension + + # create index + index_builder = build_index.FactoryIndex() + index = index_builder().build(d, raw_vector_array, raw_vector_id_array) + + # TODO(jinhai): store index into Cache + index_filename = file.filename + '_index' + serialize.write_index(file_name=index_filename, index=index) + + UpdateFile(file.filename, {'row_number': ROW_LIMIT, 'type': 'index', 'filename': index_filename}) + + # create new raw file name + raw_filename = str(group.file_number + 1) + table_row_number = 0 + + # update file table + MetaManager.CreateRawFile(group_name, raw_filename) + + # Append vectors to raw file + if current_raw_file == None: + # update file table + MetaManager.CreateRawFile(group_name, raw_filename) + + # 1. update db + new_group_file_number = group.file_number + 1 + new_group_row_number = int(group.row_number) + incoming_row_number + MetaManager.UpdateGroup(group_name, {'file_number': new_group_file_number, 'row_number': new_group_row_number}) + + # 2. store vector into raw files + for i in range (start_row_index, incoming_row_number, 1): + vector_id = total_row_number + total_row_number += 1 + VectorEngine.InsertVectorIntoRawFile(group_name, raw_filename, vectors[i], vector_id) + ++ table_row_number vector_str_list.append(group_name + '.' + str(vector_id)) + MetaManager.UpdateFile(raw_filename, {'row_number': table_row_number}) + + MetaManager.UpdateGroup(group_name, {'row_number': total_row_number}) + # 3. sync + MetaManager.Sync() return VectorEngine.SUCCESS_CODE, vector_str_list diff --git a/pyengine/engine/model/file_table.py b/pyengine/engine/model/file_table.py index 093daa5b32..6588c7123e 100644 --- a/pyengine/engine/model/file_table.py +++ b/pyengine/engine/model/file_table.py @@ -7,7 +7,7 @@ class FileTable(db.Model): filename = db.Column(db.String(100)) type = db.Column(db.String(100)) row_number = db.Column(db.Integer) - seq_no = db.Column(db.Integer) + date = db.Column(db.Date) def __init__(self, group_name, filename, type, row_number): @@ -16,7 +16,6 @@ class FileTable(db.Model): self.type = type self.row_number = row_number self.type = type - self.seq_no = 0 def __repr__(self): diff --git a/pyengine/engine/model/group_table.py b/pyengine/engine/model/group_table.py index 3fe7d4f337..da3f4f7f55 100644 --- a/pyengine/engine/model/group_table.py +++ b/pyengine/engine/model/group_table.py @@ -5,6 +5,7 @@ class GroupTable(db.Model): id = db.Column(db.Integer, primary_key=True) group_name = db.Column(db.String(100)) file_number = db.Column(db.Integer) + row_number = db.Column(db.BigInteger) dimension = db.Column(db.Integer) @@ -12,6 +13,7 @@ class GroupTable(db.Model): self.group_name = group_name self.dimension = dimension self.file_number = 0 + self.row_number = 0 def __repr__(self):