mirror of https://github.com/milvus-io/milvus.git
refactor(db): CreateTableFile
Former-commit-id: c29b8ccc97cc327ec9c278f2ea5456369365c4f3pull/191/head
parent
9d4591612d
commit
01e4e605e2
|
@ -259,17 +259,17 @@ void DBImpl<EngineT>::background_call() {
|
|||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::DateT& date,
|
||||
const meta::TableFilesSchema& files) {
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = table_id;
|
||||
group_file.date = date;
|
||||
Status status = _pMeta->add_group_file(group_file);
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = table_id;
|
||||
table_file.date = date;
|
||||
Status status = _pMeta->CreateTableFile(table_file);
|
||||
|
||||
if (!status.ok()) {
|
||||
LOG(INFO) << status.ToString() << std::endl;
|
||||
return status;
|
||||
}
|
||||
|
||||
EngineT index(group_file.dimension, group_file.location);
|
||||
EngineT index(table_file.dimension, table_file.location);
|
||||
|
||||
meta::TableFilesSchema updated;
|
||||
long index_size = 0;
|
||||
|
@ -288,14 +288,14 @@ Status DBImpl<EngineT>::merge_files(const std::string& table_id, const meta::Dat
|
|||
index.Serialize();
|
||||
|
||||
if (index_size >= _options.index_trigger_size) {
|
||||
group_file.file_type = meta::TableFileSchema::TO_INDEX;
|
||||
table_file.file_type = meta::TableFileSchema::TO_INDEX;
|
||||
} else {
|
||||
group_file.file_type = meta::TableFileSchema::RAW;
|
||||
table_file.file_type = meta::TableFileSchema::RAW;
|
||||
}
|
||||
group_file.size = index_size;
|
||||
updated.push_back(group_file);
|
||||
table_file.size = index_size;
|
||||
updated.push_back(table_file);
|
||||
status = _pMeta->update_files(updated);
|
||||
LOG(DEBUG) << "New merged file " << group_file.file_id <<
|
||||
LOG(DEBUG) << "New merged file " << table_file.file_id <<
|
||||
" of size=" << index.PhysicalSize()/(1024*1024) << " M";
|
||||
|
||||
index.Cache();
|
||||
|
@ -337,10 +337,10 @@ Status DBImpl<EngineT>::background_merge_files(const std::string& table_id) {
|
|||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = file.table_id;
|
||||
group_file.date = file.date;
|
||||
Status status = _pMeta->add_group_file(group_file);
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = file.table_id;
|
||||
table_file.date = file.date;
|
||||
Status status = _pMeta->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -348,18 +348,18 @@ Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
|
|||
EngineT to_index(file.dimension, file.location);
|
||||
|
||||
to_index.Load();
|
||||
auto index = to_index.BuildIndex(group_file.location);
|
||||
auto index = to_index.BuildIndex(table_file.location);
|
||||
|
||||
group_file.file_type = meta::TableFileSchema::INDEX;
|
||||
group_file.size = index->Size();
|
||||
table_file.file_type = meta::TableFileSchema::INDEX;
|
||||
table_file.size = index->Size();
|
||||
|
||||
auto to_remove = file;
|
||||
to_remove.file_type = meta::TableFileSchema::TO_DELETE;
|
||||
|
||||
meta::TableFilesSchema update_files = {to_remove, group_file};
|
||||
meta::TableFilesSchema update_files = {to_remove, table_file};
|
||||
_pMeta->update_files(update_files);
|
||||
|
||||
LOG(DEBUG) << "New index file " << group_file.file_id << " of size "
|
||||
LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
|
||||
<< index->PhysicalSize()/(1024*1024) << " M"
|
||||
<< " from file " << to_remove.file_id;
|
||||
|
||||
|
|
|
@ -218,35 +218,35 @@ Status DBMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::add_group_file(TableFileSchema& group_file) {
|
||||
if (group_file.date == EmptyDate) {
|
||||
group_file.date = Meta::GetDate();
|
||||
Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
|
||||
if (file_schema.date == EmptyDate) {
|
||||
file_schema.date = Meta::GetDate();
|
||||
}
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id = group_file.table_id;
|
||||
table_schema.table_id = file_schema.table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
NextFileId(group_file.file_id);
|
||||
group_file.file_type = TableFileSchema::NEW;
|
||||
group_file.dimension = table_schema.dimension;
|
||||
group_file.size = 0;
|
||||
group_file.created_on = utils::GetMicroSecTimeStamp();
|
||||
group_file.updated_time = group_file.created_on;
|
||||
GetGroupFilePath(group_file);
|
||||
NextFileId(file_schema.file_id);
|
||||
file_schema.file_type = TableFileSchema::NEW;
|
||||
file_schema.dimension = table_schema.dimension;
|
||||
file_schema.size = 0;
|
||||
file_schema.created_on = utils::GetMicroSecTimeStamp();
|
||||
file_schema.updated_time = file_schema.created_on;
|
||||
GetGroupFilePath(file_schema);
|
||||
|
||||
{
|
||||
try {
|
||||
auto id = ConnectorPtr->insert(group_file);
|
||||
group_file.id = id;
|
||||
auto id = ConnectorPtr->insert(file_schema);
|
||||
file_schema.id = id;
|
||||
} catch (...) {
|
||||
return Status::DBTransactionError("Add file Error");
|
||||
}
|
||||
}
|
||||
|
||||
auto partition_path = GetGroupDatePartitionPath(group_file.table_id, group_file.date);
|
||||
auto partition_path = GetGroupDatePartitionPath(file_schema.table_id, file_schema.date);
|
||||
|
||||
if (!boost::filesystem::is_directory(partition_path)) {
|
||||
auto ret = boost::filesystem::create_directory(partition_path);
|
||||
|
|
|
@ -23,7 +23,7 @@ public:
|
|||
virtual Status DescribeTable(TableSchema& group_info_) override;
|
||||
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
|
||||
|
||||
virtual Status add_group_file(TableFileSchema& group_file_info) override;
|
||||
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
|
||||
virtual Status delete_group_partitions(const std::string& table_id,
|
||||
const meta::DatesT& dates) override;
|
||||
|
||||
|
|
|
@ -85,14 +85,14 @@ typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::get_mem_by_grou
|
|||
return memIt->second;
|
||||
}
|
||||
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = table_id;
|
||||
auto status = _pMeta->add_group_file(group_file);
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = table_id;
|
||||
auto status = _pMeta->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
_memMap[table_id] = MemVectorsPtr(new MemVectors<EngineT>(_pMeta, group_file, options_));
|
||||
_memMap[table_id] = MemVectorsPtr(new MemVectors<EngineT>(_pMeta, table_file, options_));
|
||||
return _memMap[table_id];
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ public:
|
|||
virtual Status DescribeTable(TableSchema& table_schema) = 0;
|
||||
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
|
||||
|
||||
virtual Status add_group_file(TableFileSchema& group_file_info) = 0;
|
||||
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
|
||||
virtual Status delete_group_partitions(const std::string& table_id,
|
||||
const meta::DatesT& dates) = 0;
|
||||
|
||||
|
|
|
@ -41,53 +41,53 @@ TEST_F(MetaTest, GROUP_TEST) {
|
|||
ASSERT_TRUE(!status.ok());
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, GROUP_FILE_TEST) {
|
||||
TEST_F(MetaTest, table_file_TEST) {
|
||||
auto table_id = "meta_test_group";
|
||||
|
||||
meta::TableSchema group;
|
||||
group.table_id = table_id;
|
||||
auto status = impl_->CreateTable(group);
|
||||
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = group.table_id;
|
||||
status = impl_->add_group_file(group_file);
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = group.table_id;
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(group_file.file_type, meta::TableFileSchema::NEW);
|
||||
ASSERT_EQ(table_file.file_type, meta::TableFileSchema::NEW);
|
||||
|
||||
auto file_id = group_file.file_id;
|
||||
auto file_id = table_file.file_id;
|
||||
|
||||
auto new_file_type = meta::TableFileSchema::INDEX;
|
||||
group_file.file_type = new_file_type;
|
||||
table_file.file_type = new_file_type;
|
||||
|
||||
status = impl_->update_group_file(group_file);
|
||||
status = impl_->update_group_file(table_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(group_file.file_type, new_file_type);
|
||||
ASSERT_EQ(table_file.file_type, new_file_type);
|
||||
|
||||
meta::DatesT dates;
|
||||
dates.push_back(meta::Meta::GetDate());
|
||||
status = impl_->delete_group_partitions(group_file.table_id, dates);
|
||||
status = impl_->delete_group_partitions(table_file.table_id, dates);
|
||||
ASSERT_FALSE(status.ok());
|
||||
|
||||
dates.clear();
|
||||
for (auto i=2; i < 10; ++i) {
|
||||
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
|
||||
}
|
||||
status = impl_->delete_group_partitions(group_file.table_id, dates);
|
||||
status = impl_->delete_group_partitions(table_file.table_id, dates);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
group_file.date = meta::Meta::GetDateWithDelta(-2);
|
||||
status = impl_->update_group_file(group_file);
|
||||
table_file.date = meta::Meta::GetDateWithDelta(-2);
|
||||
status = impl_->update_group_file(table_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2));
|
||||
ASSERT_FALSE(group_file.file_type == meta::TableFileSchema::TO_DELETE);
|
||||
ASSERT_EQ(table_file.date, meta::Meta::GetDateWithDelta(-2));
|
||||
ASSERT_FALSE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
|
||||
|
||||
dates.clear();
|
||||
dates.push_back(group_file.date);
|
||||
status = impl_->delete_group_partitions(group_file.table_id, dates);
|
||||
dates.push_back(table_file.date);
|
||||
status = impl_->delete_group_partitions(table_file.table_id, dates);
|
||||
ASSERT_TRUE(status.ok());
|
||||
status = impl_->get_group_file(group_file.table_id, group_file.file_id, group_file);
|
||||
status = impl_->get_group_file(table_file.table_id, table_file.file_id, table_file);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_TRUE(group_file.file_type == meta::TableFileSchema::TO_DELETE);
|
||||
ASSERT_TRUE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
|
||||
|
@ -107,19 +107,19 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
|
|||
auto status = impl.CreateTable(group);
|
||||
|
||||
meta::TableFilesSchema files;
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = group.table_id;
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = group.table_id;
|
||||
|
||||
auto cnt = 100;
|
||||
long ts = utils::GetMicroSecTimeStamp();
|
||||
std::vector<int> days;
|
||||
for (auto i=0; i<cnt; ++i) {
|
||||
status = impl.add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::NEW;
|
||||
status = impl.CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::NEW;
|
||||
int day = rand() % (days_num*2);
|
||||
group_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
|
||||
status = impl.update_group_file(group_file);
|
||||
files.push_back(group_file);
|
||||
table_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
|
||||
status = impl.update_group_file(table_file);
|
||||
files.push_back(table_file);
|
||||
days.push_back(day);
|
||||
}
|
||||
|
||||
|
@ -153,17 +153,17 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
|
|||
auto status = impl.CreateTable(group);
|
||||
|
||||
meta::TableFilesSchema files;
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = group.table_id;
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = group.table_id;
|
||||
|
||||
auto cnt = 10;
|
||||
auto each_size = 2UL;
|
||||
for (auto i=0; i<cnt; ++i) {
|
||||
status = impl.add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::NEW;
|
||||
group_file.size = each_size * meta::G;
|
||||
status = impl.update_group_file(group_file);
|
||||
files.push_back(group_file);
|
||||
status = impl.CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::NEW;
|
||||
table_file.size = each_size * meta::G;
|
||||
status = impl.update_group_file(table_file);
|
||||
files.push_back(table_file);
|
||||
}
|
||||
|
||||
impl.archive_files();
|
||||
|
@ -183,7 +183,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
|
|||
impl.drop_all();
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, GROUP_FILES_TEST) {
|
||||
TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||
auto table_id = "meta_test_group";
|
||||
|
||||
meta::TableSchema group;
|
||||
|
@ -195,31 +195,31 @@ TEST_F(MetaTest, GROUP_FILES_TEST) {
|
|||
int to_index_files_cnt = 6;
|
||||
int index_files_cnt = 7;
|
||||
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.table_id = group.table_id;
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id = group.table_id;
|
||||
|
||||
for (auto i=0; i<new_files_cnt; ++i) {
|
||||
status = impl_->add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::NEW;
|
||||
status = impl_->update_group_file(group_file);
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::NEW;
|
||||
status = impl_->update_group_file(table_file);
|
||||
}
|
||||
|
||||
for (auto i=0; i<raw_files_cnt; ++i) {
|
||||
status = impl_->add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::RAW;
|
||||
status = impl_->update_group_file(group_file);
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::RAW;
|
||||
status = impl_->update_group_file(table_file);
|
||||
}
|
||||
|
||||
for (auto i=0; i<to_index_files_cnt; ++i) {
|
||||
status = impl_->add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::TO_INDEX;
|
||||
status = impl_->update_group_file(group_file);
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::TO_INDEX;
|
||||
status = impl_->update_group_file(table_file);
|
||||
}
|
||||
|
||||
for (auto i=0; i<index_files_cnt; ++i) {
|
||||
status = impl_->add_group_file(group_file);
|
||||
group_file.file_type = meta::TableFileSchema::INDEX;
|
||||
status = impl_->update_group_file(group_file);
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
table_file.file_type = meta::TableFileSchema::INDEX;
|
||||
status = impl_->update_group_file(table_file);
|
||||
}
|
||||
|
||||
meta::TableFilesSchema files;
|
||||
|
@ -231,15 +231,15 @@ TEST_F(MetaTest, GROUP_FILES_TEST) {
|
|||
meta::DatePartionedTableFilesSchema dated_files;
|
||||
status = impl_->files_to_merge(group.table_id, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[group_file.date].size(), raw_files_cnt);
|
||||
ASSERT_EQ(dated_files[table_file.date].size(), raw_files_cnt);
|
||||
|
||||
status = impl_->files_to_index(files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(files.size(), to_index_files_cnt);
|
||||
|
||||
meta::DatesT dates = {group_file.date};
|
||||
meta::DatesT dates = {table_file.date};
|
||||
status = impl_->files_to_search(table_id, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[group_file.date].size(),
|
||||
ASSERT_EQ(dated_files[table_file.date].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue