mirror of https://github.com/milvus-io/milvus.git
refine meta code (#1871)
* #1827 Combine request target vectors exceed max nq Signed-off-by: groot <yihua.mo@zilliz.com> * refine dbimpl Signed-off-by: groot <yihua.mo@zilliz.com> * refine dbimpl Signed-off-by: groot <yihua.mo@zilliz.com> * fix unittest failure Signed-off-by: groot <yihua.mo@zilliz.com> * merge master Signed-off-by: groot <yihua.mo@zilliz.com> * refine meta code Signed-off-by: groot <yihua.mo@zilliz.com> Co-authored-by: Jin Hai <hai.jin@zilliz.com>pull/1872/head
parent
0fd4b244c0
commit
6a960cda2c
|
@ -47,9 +47,14 @@ server_config:
|
|||
# | '*' means preload all existing tables (single-quote or | | |
|
||||
# | double-quote required). | | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
|
||||
# | flushes data to disk. | | |
|
||||
# | 0 means disable the regular flush. | | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
db_config:
|
||||
backend_url: sqlite://:@:/
|
||||
preload_table:
|
||||
auto_flush_interval: 1
|
||||
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
# Storage Config | Description | Type | Default |
|
||||
|
|
|
@ -192,10 +192,6 @@ MemTableFile::Serialize(uint64_t wal_lsn) {
|
|||
table_file_schema_.file_type_ = meta::SegmentSchema::RAW;
|
||||
}
|
||||
|
||||
// Set collection file's flush_lsn so WAL can roll back and delete garbage files which can be obtained from
|
||||
// GetTableFilesByFlushLSN() in meta.
|
||||
table_file_schema_.flush_lsn_ = wal_lsn;
|
||||
|
||||
status = meta_->UpdateCollectionFile(table_file_schema_);
|
||||
|
||||
ENGINE_LOG_DEBUG << "New " << ((table_file_schema_.file_type_ == meta::SegmentSchema::RAW) ? "raw" : "to_index")
|
||||
|
|
|
@ -63,9 +63,6 @@ class Meta {
|
|||
virtual Status
|
||||
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) = 0;
|
||||
|
||||
virtual Status
|
||||
GetTableFilesByFlushLSN(uint64_t flush_lsn, SegmentsSchema& table_files) = 0;
|
||||
|
||||
virtual Status
|
||||
DropCollection(const std::string& collection_id) = 0;
|
||||
|
||||
|
|
|
@ -1042,77 +1042,6 @@ MySQLMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t&
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, SegmentsSchema& table_files) {
|
||||
table_files.clear();
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query filesToIndexQuery = connectionPtr->query();
|
||||
filesToIndexQuery << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
|
||||
"row_count, date, created_on"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE flush_lsn = " << flush_lsn << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str();
|
||||
|
||||
res = filesToIndexQuery.store();
|
||||
} // Scoped Connection
|
||||
|
||||
Status ret;
|
||||
std::map<std::string, CollectionSchema> groups;
|
||||
SegmentSchema table_file;
|
||||
for (auto& resRow : res) {
|
||||
table_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(table_file.collection_id_);
|
||||
resRow["segment_id"].to_string(table_file.segment_id_);
|
||||
table_file.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(table_file.file_id_);
|
||||
table_file.file_type_ = resRow["file_type"];
|
||||
table_file.file_size_ = resRow["file_size"];
|
||||
table_file.row_count_ = resRow["row_count"];
|
||||
table_file.date_ = resRow["date"];
|
||||
table_file.created_on_ = resRow["created_on"];
|
||||
|
||||
auto groupItr = groups.find(table_file.collection_id_);
|
||||
if (groupItr == groups.end()) {
|
||||
CollectionSchema table_schema;
|
||||
table_schema.collection_id_ = table_file.collection_id_;
|
||||
auto status = DescribeCollection(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
groups[table_file.collection_id_] = table_schema;
|
||||
}
|
||||
table_file.dimension_ = groups[table_file.collection_id_].dimension_;
|
||||
table_file.index_file_size_ = groups[table_file.collection_id_].index_file_size_;
|
||||
table_file.index_params_ = groups[table_file.collection_id_].index_params_;
|
||||
table_file.metric_type_ = groups[table_file.collection_id_].metric_type_;
|
||||
|
||||
auto status = utils::GetTableFilePath(options_, table_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
}
|
||||
|
||||
table_files.push_back(table_file);
|
||||
}
|
||||
|
||||
if (res.size() > 0) {
|
||||
ENGINE_LOG_DEBUG << "Collect " << res.size() << " files with flush_lsn = " << flush_lsn;
|
||||
}
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES BY LSN", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
// ZR: this function assumes all fields in file_schema have value
|
||||
Status
|
||||
MySQLMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
|
||||
|
|
|
@ -71,9 +71,6 @@ class MySQLMetaImpl : public Meta {
|
|||
Status
|
||||
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) override;
|
||||
|
||||
Status
|
||||
GetTableFilesByFlushLSN(uint64_t flush_lsn, SegmentsSchema& table_files) override;
|
||||
|
||||
Status
|
||||
UpdateCollectionFile(SegmentSchema& file_schema) override;
|
||||
|
||||
|
|
|
@ -550,66 +550,6 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::GetTableFilesByFlushLSN(uint64_t flush_lsn, SegmentsSchema& table_files) {
|
||||
table_files.clear();
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
auto selected = ConnectorPtr->select(
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_),
|
||||
where(c(&SegmentSchema::flush_lsn_) == flush_lsn));
|
||||
|
||||
std::map<std::string, CollectionSchema> groups;
|
||||
SegmentSchema table_file;
|
||||
|
||||
Status ret;
|
||||
for (auto& file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.collection_id_ = std::get<1>(file);
|
||||
table_file.segment_id_ = std::get<2>(file);
|
||||
table_file.file_id_ = std::get<3>(file);
|
||||
table_file.file_type_ = std::get<4>(file);
|
||||
table_file.file_size_ = std::get<5>(file);
|
||||
table_file.row_count_ = std::get<6>(file);
|
||||
table_file.date_ = std::get<7>(file);
|
||||
table_file.engine_type_ = std::get<8>(file);
|
||||
table_file.created_on_ = std::get<9>(file);
|
||||
|
||||
auto status = utils::GetTableFilePath(options_, table_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
}
|
||||
auto groupItr = groups.find(table_file.collection_id_);
|
||||
if (groupItr == groups.end()) {
|
||||
CollectionSchema table_schema;
|
||||
table_schema.collection_id_ = table_file.collection_id_;
|
||||
auto status = DescribeCollection(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
groups[table_file.collection_id_] = table_schema;
|
||||
}
|
||||
table_file.dimension_ = groups[table_file.collection_id_].dimension_;
|
||||
table_file.index_file_size_ = groups[table_file.collection_id_].index_file_size_;
|
||||
table_file.index_params_ = groups[table_file.collection_id_].index_params_;
|
||||
table_file.metric_type_ = groups[table_file.collection_id_].metric_type_;
|
||||
table_files.push_back(table_file);
|
||||
}
|
||||
|
||||
if (selected.size() > 0) {
|
||||
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files with flush_lsn = " << flush_lsn;
|
||||
}
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when getting collection files by flush_lsn", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
|
||||
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||
|
|
|
@ -70,9 +70,6 @@ class SqliteMetaImpl : public Meta {
|
|||
Status
|
||||
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) override;
|
||||
|
||||
Status
|
||||
GetTableFilesByFlushLSN(uint64_t flush_lsn, SegmentsSchema& table_files) override;
|
||||
|
||||
Status
|
||||
UpdateCollectionFile(SegmentSchema& file_schema) override;
|
||||
|
||||
|
|
Loading…
Reference in New Issue