mirror of https://github.com/milvus-io/milvus.git
* return partition lsn Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix wal lsn Signed-off-by: shengjun.li <shengjun.li@zilliz.com> * fix wal issue Signed-off-by: yhmo <yihua.mo@zilliz.com> * changelog Signed-off-by: yhmo <yihua.mo@zilliz.com> * typo Signed-off-by: yhmo <yihua.mo@zilliz.com> * all collection include partition Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix build error Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix flush Signed-off-by: shengjun.li <shengjun.li@zilliz.com> Co-authored-by: shengjun.li <shengjun.li@zilliz.com>pull/2396/head^2
parent
40055fcaf0
commit
4c9684366c
|
@ -4,6 +4,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
# Milvus 0.10.0 (TBD)
|
||||
|
||||
## Bug
|
||||
- \#2378 Duplicate data after server restart
|
||||
|
||||
## Feature
|
||||
- \#2363 Update branch version
|
||||
|
|
|
@ -487,7 +487,11 @@ DBImpl::CreatePartition(const std::string& collection_id, const std::string& par
|
|||
}
|
||||
|
||||
uint64_t lsn = 0;
|
||||
meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
|
||||
if (options_.wal_enable_) {
|
||||
lsn = wal_mgr_->CreatePartition(collection_id, partition_tag);
|
||||
} else {
|
||||
meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
|
||||
}
|
||||
return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
|
||||
}
|
||||
|
||||
|
@ -545,6 +549,10 @@ DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string&
|
|||
return status;
|
||||
}
|
||||
|
||||
if (options_.wal_enable_) {
|
||||
wal_mgr_->DropPartition(collection_id, partition_tag);
|
||||
}
|
||||
|
||||
return DropPartition(partition_name);
|
||||
}
|
||||
|
||||
|
@ -891,7 +899,7 @@ DBImpl::Flush(const std::string& collection_id) {
|
|||
swn_wal_.Notify();
|
||||
flush_req_swn_.Wait();
|
||||
}
|
||||
|
||||
StartMergeTask();
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "MemTable flush";
|
||||
InternalFlush(collection_id);
|
||||
|
@ -918,6 +926,7 @@ DBImpl::Flush() {
|
|||
swn_wal_.Notify();
|
||||
flush_req_swn_.Wait();
|
||||
}
|
||||
StartMergeTask();
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "MemTable flush";
|
||||
InternalFlush();
|
||||
|
@ -1421,7 +1430,9 @@ DBImpl::DropIndex(const std::string& collection_id) {
|
|||
}
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
|
||||
return DropCollectionIndexRecursively(collection_id);
|
||||
auto status = DropCollectionIndexRecursively(collection_id);
|
||||
StartMergeTask(); // merge small files after drop index
|
||||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
|
@ -2407,30 +2418,39 @@ Status
|
|||
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
||||
fiu_return_on("DBImpl.ExexWalRecord.return", Status(););
|
||||
|
||||
auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
|
||||
if (collection_ids.empty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto collections_flushed = [&](const std::string collection_id,
|
||||
const std::set<std::string>& target_collection_names) -> uint64_t {
|
||||
uint64_t max_lsn = 0;
|
||||
if (options_.wal_enable_) {
|
||||
for (auto& collection : collection_ids) {
|
||||
uint64_t lsn = 0;
|
||||
uint64_t lsn = 0;
|
||||
for (auto& collection : target_collection_names) {
|
||||
meta_ptr_->GetCollectionFlushLSN(collection, lsn);
|
||||
wal_mgr_->CollectionFlushed(collection, lsn);
|
||||
if (lsn > max_lsn) {
|
||||
max_lsn = lsn;
|
||||
}
|
||||
}
|
||||
wal_mgr_->CollectionFlushed(collection_id, lsn);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(merge_result_mutex_);
|
||||
for (auto& collection : collection_ids) {
|
||||
for (auto& collection : target_collection_names) {
|
||||
merge_collection_ids_.insert(collection);
|
||||
}
|
||||
return max_lsn;
|
||||
};
|
||||
|
||||
auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
|
||||
const std::string& target_collection_name) {
|
||||
if (options_.wal_enable_) {
|
||||
uint64_t lsn = 0;
|
||||
meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
|
||||
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(merge_result_mutex_);
|
||||
merge_collection_ids_.insert(target_collection_name);
|
||||
};
|
||||
|
||||
Status status;
|
||||
|
||||
switch (record.type) {
|
||||
|
@ -2447,7 +2467,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
(record.data_size / record.length / sizeof(float)),
|
||||
(const float*)record.data, record.attr_nbytes, record.attr_data_size,
|
||||
record.attr_data, record.lsn, flushed_collections);
|
||||
collections_flushed(flushed_collections);
|
||||
if (!flushed_collections.empty()) {
|
||||
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
|
||||
}
|
||||
|
||||
milvus::server::CollectInsertMetrics metrics(record.length, status);
|
||||
break;
|
||||
|
@ -2465,7 +2487,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
(record.data_size / record.length / sizeof(uint8_t)),
|
||||
(const u_int8_t*)record.data, record.lsn, flushed_collections);
|
||||
// even though !status.ok, run
|
||||
collections_flushed(flushed_collections);
|
||||
if (!flushed_collections.empty()) {
|
||||
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
|
||||
}
|
||||
|
||||
// metrics
|
||||
milvus::server::CollectInsertMetrics metrics(record.length, status);
|
||||
|
@ -2485,7 +2509,9 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
(record.data_size / record.length / sizeof(float)),
|
||||
(const float*)record.data, record.lsn, flushed_collections);
|
||||
// even though !status.ok, run
|
||||
collections_flushed(flushed_collections);
|
||||
if (!flushed_collections.empty()) {
|
||||
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
|
||||
}
|
||||
|
||||
// metrics
|
||||
milvus::server::CollectInsertMetrics metrics(record.length, status);
|
||||
|
@ -2548,7 +2574,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
flushed_collections.insert(collection_id);
|
||||
}
|
||||
|
||||
collections_flushed(flushed_collections);
|
||||
collections_flushed(record.collection_id, flushed_collections);
|
||||
|
||||
} else {
|
||||
// flush all collections
|
||||
|
@ -2558,7 +2584,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
status = mem_mgr_->Flush(collection_ids);
|
||||
}
|
||||
|
||||
uint64_t lsn = collections_flushed(collection_ids);
|
||||
uint64_t lsn = collections_flushed("", collection_ids);
|
||||
if (options_.wal_enable_) {
|
||||
wal_mgr_->RemoveOldFiles(lsn);
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ class Meta {
|
|||
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) = 0;
|
||||
|
||||
virtual Status
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array) = 0;
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root = false) = 0;
|
||||
|
||||
virtual Status
|
||||
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0;
|
||||
|
|
|
@ -582,7 +582,7 @@ MySQLMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not,
|
|||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array) {
|
||||
MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
mysqlpp::StoreQueryResult res;
|
||||
|
@ -599,8 +599,12 @@ MySQLMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_a
|
|||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, table_id, dimension, engine_type, index_params, index_file_size, metric_type"
|
||||
<< " ,owner_table, partition_tag, version, flush_lsn"
|
||||
<< " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE)
|
||||
<< " AND owner_table = \"\";";
|
||||
<< " FROM " << META_TABLES << " WHERE state <> " << std::to_string(CollectionSchema::TO_DELETE);
|
||||
if (is_root) {
|
||||
statement << " AND owner_table = \"\";";
|
||||
} else {
|
||||
statement << ";";
|
||||
}
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str();
|
||||
|
||||
|
@ -1535,8 +1539,8 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
|
|||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT table_id, id, state, dimension, created_on, flag, index_file_size,"
|
||||
<< " engine_type, index_params, metric_type, partition_tag, version FROM " << META_TABLES
|
||||
<< " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
|
||||
<< " engine_type, index_params, metric_type, partition_tag, version, flush_lsn FROM "
|
||||
<< META_TABLES << " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
|
||||
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement.str();
|
||||
|
@ -1559,6 +1563,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
|
|||
partition_schema.owner_collection_ = collection_id;
|
||||
resRow["partition_tag"].to_string(partition_schema.partition_tag_);
|
||||
resRow["version"].to_string(partition_schema.version_);
|
||||
partition_schema.flush_lsn_ = resRow["flush_lsn"];
|
||||
|
||||
partition_schema_array.emplace_back(partition_schema);
|
||||
}
|
||||
|
@ -2755,6 +2760,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
}
|
||||
|
||||
bool first_create = false;
|
||||
uint64_t last_lsn = 0;
|
||||
{
|
||||
mysqlpp::StoreQueryResult res;
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
|
@ -2762,6 +2768,8 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
res = statement.store();
|
||||
if (res.num_rows() == 0) {
|
||||
first_create = true;
|
||||
} else {
|
||||
last_lsn = res[0]["global_lsn"];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2773,7 +2781,7 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
if (!statement.exec()) {
|
||||
return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", statement.error());
|
||||
}
|
||||
} else {
|
||||
} else if (lsn > last_lsn) {
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "UPDATE " << META_ENVIRONMENT << " SET global_lsn = " << lsn << ";";
|
||||
LOG_ENGINE_DEBUG_ << "SetGlobalLastLSN: " << statement.str();
|
||||
|
@ -2783,8 +2791,6 @@ MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
}
|
||||
}
|
||||
} // Scoped Connection
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Successfully update global_lsn: " << lsn;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Failed to set global lsn", e.what());
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ class MySQLMetaImpl : public Meta {
|
|||
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
|
||||
|
||||
Status
|
||||
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;
|
||||
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
|
||||
|
||||
Status
|
||||
DropCollection(const std::string& collection_id) override;
|
||||
|
|
|
@ -309,29 +309,37 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
|
|||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array) {
|
||||
SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root) {
|
||||
try {
|
||||
fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception());
|
||||
server::MetricCollector metric;
|
||||
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
auto selected = ConnectorPtr->select(
|
||||
columns(&CollectionSchema::id_,
|
||||
&CollectionSchema::collection_id_,
|
||||
&CollectionSchema::dimension_,
|
||||
&CollectionSchema::created_on_,
|
||||
&CollectionSchema::flag_,
|
||||
&CollectionSchema::index_file_size_,
|
||||
&CollectionSchema::engine_type_,
|
||||
&CollectionSchema::index_params_,
|
||||
&CollectionSchema::metric_type_,
|
||||
&CollectionSchema::owner_collection_,
|
||||
&CollectionSchema::partition_tag_,
|
||||
&CollectionSchema::version_,
|
||||
&CollectionSchema::flush_lsn_),
|
||||
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
|
||||
and c(&CollectionSchema::owner_collection_) == ""));
|
||||
auto select_columns = columns(&CollectionSchema::id_,
|
||||
&CollectionSchema::collection_id_,
|
||||
&CollectionSchema::dimension_,
|
||||
&CollectionSchema::created_on_,
|
||||
&CollectionSchema::flag_,
|
||||
&CollectionSchema::index_file_size_,
|
||||
&CollectionSchema::engine_type_,
|
||||
&CollectionSchema::index_params_,
|
||||
&CollectionSchema::metric_type_,
|
||||
&CollectionSchema::owner_collection_,
|
||||
&CollectionSchema::partition_tag_,
|
||||
&CollectionSchema::version_,
|
||||
&CollectionSchema::flush_lsn_);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
|
||||
if (is_root) {
|
||||
selected = ConnectorPtr->select(select_columns,
|
||||
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
|
||||
and c(&CollectionSchema::owner_collection_) == ""));
|
||||
} else {
|
||||
selected = ConnectorPtr->select(select_columns,
|
||||
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
|
||||
}
|
||||
|
||||
for (auto& collection : selected) {
|
||||
CollectionSchema schema;
|
||||
schema.id_ = std::get<0>(collection);
|
||||
|
@ -992,7 +1000,8 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
|
|||
&CollectionSchema::metric_type_,
|
||||
&CollectionSchema::partition_tag_,
|
||||
&CollectionSchema::version_,
|
||||
&CollectionSchema::collection_id_),
|
||||
&CollectionSchema::collection_id_,
|
||||
&CollectionSchema::flush_lsn_),
|
||||
where(c(&CollectionSchema::owner_collection_) == collection_id and
|
||||
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
|
||||
|
||||
|
@ -1011,6 +1020,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
|
|||
partition_schema.partition_tag_ = std::get<9>(partitions[i]);
|
||||
partition_schema.version_ = std::get<10>(partitions[i]);
|
||||
partition_schema.collection_id_ = std::get<11>(partitions[i]);
|
||||
partition_schema.flush_lsn_ = std::get<12>(partitions[i]);
|
||||
partition_schema_array.emplace_back(partition_schema);
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
|
@ -1939,7 +1949,7 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
ConnectorPtr->insert(env);
|
||||
} else {
|
||||
uint64_t last_lsn = std::get<0>(selected[0]);
|
||||
if (lsn == last_lsn) {
|
||||
if (lsn <= last_lsn) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ class SqliteMetaImpl : public Meta {
|
|||
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
|
||||
|
||||
Status
|
||||
AllCollections(std::vector<CollectionSchema>& collection_schema_array) override;
|
||||
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
|
||||
|
||||
Status
|
||||
DropCollection(const std::string& collection_id) override;
|
||||
|
|
|
@ -62,35 +62,55 @@ WalManager::Init(const meta::MetaPtr& meta) {
|
|||
if (meta != nullptr) {
|
||||
meta->GetGlobalLastLSN(recovery_start);
|
||||
|
||||
std::vector<meta::CollectionSchema> table_schema_array;
|
||||
auto status = meta->AllCollections(table_schema_array);
|
||||
std::vector<meta::CollectionSchema> collention_schema_array;
|
||||
auto status = meta->AllCollections(collention_schema_array);
|
||||
if (!status.ok()) {
|
||||
return WAL_META_ERROR;
|
||||
}
|
||||
|
||||
if (!table_schema_array.empty()) {
|
||||
// get min and max flushed lsn
|
||||
uint64_t min_flused_lsn = table_schema_array[0].flush_lsn_;
|
||||
uint64_t max_flused_lsn = table_schema_array[0].flush_lsn_;
|
||||
for (size_t i = 1; i < table_schema_array.size(); i++) {
|
||||
if (min_flused_lsn > table_schema_array[i].flush_lsn_) {
|
||||
min_flused_lsn = table_schema_array[i].flush_lsn_;
|
||||
} else if (max_flused_lsn < table_schema_array[i].flush_lsn_) {
|
||||
max_flused_lsn = table_schema_array[i].flush_lsn_;
|
||||
if (!collention_schema_array.empty()) {
|
||||
u_int64_t min_flushed_lsn = ~(u_int64_t)0;
|
||||
u_int64_t max_flushed_lsn = 0;
|
||||
auto update_limit_lsn = [&](u_int64_t lsn) {
|
||||
if (min_flushed_lsn > lsn) {
|
||||
min_flushed_lsn = lsn;
|
||||
}
|
||||
if (max_flushed_lsn < lsn) {
|
||||
max_flushed_lsn = lsn;
|
||||
}
|
||||
};
|
||||
|
||||
for (auto& col_schema : collention_schema_array) {
|
||||
auto& collection = collections_[col_schema.collection_id_];
|
||||
auto& default_part = collection[""];
|
||||
default_part.flush_lsn = col_schema.flush_lsn_;
|
||||
update_limit_lsn(default_part.flush_lsn);
|
||||
|
||||
std::vector<meta::CollectionSchema> partition_schema_array;
|
||||
status = meta->ShowPartitions(col_schema.collection_id_, partition_schema_array);
|
||||
if (!status.ok()) {
|
||||
return WAL_META_ERROR;
|
||||
}
|
||||
for (auto& par_schema : partition_schema_array) {
|
||||
auto& partition = collection[par_schema.partition_tag_];
|
||||
partition.flush_lsn = par_schema.flush_lsn_;
|
||||
update_limit_lsn(partition.flush_lsn);
|
||||
}
|
||||
}
|
||||
if (applied_lsn < max_flused_lsn) {
|
||||
|
||||
if (applied_lsn < max_flushed_lsn) {
|
||||
// a new WAL folder?
|
||||
applied_lsn = max_flused_lsn;
|
||||
applied_lsn = max_flushed_lsn;
|
||||
}
|
||||
if (recovery_start < min_flused_lsn) {
|
||||
if (recovery_start < min_flushed_lsn) {
|
||||
// not flush all yet
|
||||
recovery_start = min_flused_lsn;
|
||||
recovery_start = min_flushed_lsn;
|
||||
}
|
||||
|
||||
for (auto& schema : table_schema_array) {
|
||||
TableLsn tb_lsn = {schema.flush_lsn_, applied_lsn};
|
||||
tables_[schema.collection_id_] = tb_lsn;
|
||||
for (auto& col : collections_) {
|
||||
for (auto& part : col.second) {
|
||||
part.second.wal_lsn = applied_lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -141,9 +161,10 @@ WalManager::GetNextRecovery(MXLogRecord& record) {
|
|||
|
||||
// background thread has not started.
|
||||
// so, needn't lock here.
|
||||
auto it = tables_.find(record.collection_id);
|
||||
if (it != tables_.end()) {
|
||||
if (it->second.flush_lsn < record.lsn) {
|
||||
auto it_col = collections_.find(record.collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(record.partition_tag);
|
||||
if (it_part->second.flush_lsn < record.lsn) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -179,9 +200,10 @@ WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) {
|
|||
|
||||
// background thread has not started.
|
||||
// so, needn't lock here.
|
||||
auto it = tables_.find(record.collection_id);
|
||||
if (it != tables_.end()) {
|
||||
if (it->second.flush_lsn < record.lsn) {
|
||||
auto it_col = collections_.find(record.collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(record.partition_tag);
|
||||
if (it_part->second.flush_lsn < record.lsn) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -229,9 +251,10 @@ WalManager::GetNextRecord(MXLogRecord& record) {
|
|||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
auto it = tables_.find(record.collection_id);
|
||||
if (it != tables_.end()) {
|
||||
if (it->second.flush_lsn < record.lsn) {
|
||||
auto it_col = collections_.find(record.collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(record.partition_tag);
|
||||
if (it_part->second.flush_lsn < record.lsn) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -275,9 +298,10 @@ WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) {
|
|||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
auto it = tables_.find(record.collection_id);
|
||||
if (it != tables_.end()) {
|
||||
if (it->second.flush_lsn < record.lsn) {
|
||||
auto it_col = collections_.find(record.collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(record.partition_tag);
|
||||
if (it_part->second.flush_lsn < record.lsn) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -293,7 +317,16 @@ WalManager::CreateCollection(const std::string& collection_id) {
|
|||
LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_;
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
uint64_t applied_lsn = last_applied_lsn_;
|
||||
tables_[collection_id] = {applied_lsn, applied_lsn};
|
||||
collections_[collection_id][""] = {applied_lsn, applied_lsn};
|
||||
return applied_lsn;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
WalManager::CreatePartition(const std::string& collection_id, const std::string& partition_tag) {
|
||||
LOG_WAL_INFO_ << "create collection " << collection_id << " " << partition_tag << " " << last_applied_lsn_;
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
uint64_t applied_lsn = last_applied_lsn_;
|
||||
collections_[collection_id][partition_tag] = {applied_lsn, applied_lsn};
|
||||
return applied_lsn;
|
||||
}
|
||||
|
||||
|
@ -302,7 +335,7 @@ WalManager::CreateHybridCollection(const std::string& collection_id) {
|
|||
LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_;
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
uint64_t applied_lsn = last_applied_lsn_;
|
||||
tables_[collection_id] = {applied_lsn, applied_lsn};
|
||||
collections_[collection_id][""] = {applied_lsn, applied_lsn};
|
||||
return applied_lsn;
|
||||
}
|
||||
|
||||
|
@ -310,21 +343,84 @@ void
|
|||
WalManager::DropCollection(const std::string& collection_id) {
|
||||
LOG_WAL_INFO_ << "drop collection " << collection_id;
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
tables_.erase(collection_id);
|
||||
collections_.erase(collection_id);
|
||||
}
|
||||
|
||||
void
|
||||
WalManager::DropPartition(const std::string& collection_id, const std::string& partition_tag) {
|
||||
LOG_WAL_INFO_ << collection_id << " drop partition " << partition_tag;
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
auto it = collections_.find(collection_id);
|
||||
if (it != collections_.end()) {
|
||||
it->second.erase(partition_tag);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
WalManager::CollectionFlushed(const std::string& collection_id, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
auto it = tables_.find(collection_id);
|
||||
if (it != tables_.end()) {
|
||||
it->second.flush_lsn = lsn;
|
||||
if (collection_id.empty()) {
|
||||
// all collections
|
||||
for (auto& col : collections_) {
|
||||
for (auto& part : col.second) {
|
||||
part.second.flush_lsn = lsn;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// one collection
|
||||
auto it_col = collections_.find(collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
for (auto& part : it_col->second) {
|
||||
part.second.flush_lsn = lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
lck.unlock();
|
||||
|
||||
LOG_WAL_INFO_ << collection_id << " is flushed by lsn " << lsn;
|
||||
}
|
||||
|
||||
void
|
||||
WalManager::PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
auto it_col = collections_.find(collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(partition_tag);
|
||||
if (it_part != it_col->second.end()) {
|
||||
it_part->second.flush_lsn = lsn;
|
||||
}
|
||||
}
|
||||
lck.unlock();
|
||||
|
||||
LOG_WAL_INFO_ << collection_id << " " << partition_tag << " is flushed by lsn " << lsn;
|
||||
}
|
||||
|
||||
void
|
||||
WalManager::CollectionUpdated(const std::string& collection_id, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
auto it_col = collections_.find(collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
for (auto& part : it_col->second) {
|
||||
part.second.wal_lsn = lsn;
|
||||
}
|
||||
}
|
||||
lck.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
WalManager::PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
auto it_col = collections_.find(collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
auto it_part = it_col->second.find(partition_tag);
|
||||
if (it_part != it_col->second.end()) {
|
||||
it_part->second.wal_lsn = lsn;
|
||||
}
|
||||
}
|
||||
lck.unlock();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool
|
||||
WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids,
|
||||
|
@ -380,13 +476,8 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio
|
|||
new_lsn = record.lsn;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
last_applied_lsn_ = new_lsn;
|
||||
auto it = tables_.find(collection_id);
|
||||
if (it != tables_.end()) {
|
||||
it->second.wal_lsn = new_lsn;
|
||||
}
|
||||
lck.unlock();
|
||||
PartitionUpdated(collection_id, partition_tag, new_lsn);
|
||||
|
||||
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
|
||||
<< " with lsn " << new_lsn;
|
||||
|
@ -472,13 +563,8 @@ WalManager::InsertEntities(const std::string& collection_id, const std::string&
|
|||
new_lsn = record.lsn;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
last_applied_lsn_ = new_lsn;
|
||||
auto it = tables_.find(collection_id);
|
||||
if (it != tables_.end()) {
|
||||
it->second.wal_lsn = new_lsn;
|
||||
}
|
||||
lck.unlock();
|
||||
PartitionUpdated(collection_id, partition_tag, new_lsn);
|
||||
|
||||
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
|
||||
<< " with lsn " << new_lsn;
|
||||
|
@ -525,13 +611,8 @@ WalManager::DeleteById(const std::string& collection_id, const IDNumbers& vector
|
|||
new_lsn = record.lsn;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
last_applied_lsn_ = new_lsn;
|
||||
auto it = tables_.find(collection_id);
|
||||
if (it != tables_.end()) {
|
||||
it->second.wal_lsn = new_lsn;
|
||||
}
|
||||
lck.unlock();
|
||||
CollectionUpdated(collection_id, new_lsn);
|
||||
|
||||
LOG_WAL_INFO_ << collection_id << " delete rows by id, lsn " << new_lsn;
|
||||
|
||||
|
@ -548,19 +629,25 @@ WalManager::Flush(const std::string& collection_id) {
|
|||
uint64_t lsn = 0;
|
||||
if (collection_id.empty()) {
|
||||
// flush all tables
|
||||
for (auto& it : tables_) {
|
||||
if (it.second.wal_lsn > it.second.flush_lsn) {
|
||||
lsn = last_applied_lsn_;
|
||||
break;
|
||||
for (auto& col : collections_) {
|
||||
for (auto& part : col.second) {
|
||||
if (part.second.wal_lsn > part.second.flush_lsn) {
|
||||
lsn = last_applied_lsn_;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// flush one collection
|
||||
auto it = tables_.find(collection_id);
|
||||
if (it != tables_.end()) {
|
||||
if (it->second.wal_lsn > it->second.flush_lsn) {
|
||||
lsn = it->second.wal_lsn;
|
||||
auto it_col = collections_.find(collection_id);
|
||||
if (it_col != collections_.end()) {
|
||||
for (auto& part : it_col->second) {
|
||||
auto wal_lsn = part.second.wal_lsn;
|
||||
auto flush_lsn = part.second.flush_lsn;
|
||||
if (wal_lsn > flush_lsn && wal_lsn > lsn) {
|
||||
lsn = wal_lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ class WalManager {
|
|||
|
||||
ErrorCode
|
||||
GetNextEntityRecord(MXLogRecord& record);
|
||||
|
||||
/*
|
||||
* Create collection
|
||||
* @param collection_id: collection id
|
||||
|
@ -70,6 +71,15 @@ class WalManager {
|
|||
uint64_t
|
||||
CreateCollection(const std::string& collection_id);
|
||||
|
||||
/*
|
||||
* Create partition
|
||||
* @param collection_id: collection id
|
||||
* @param partition_tag: partition tag
|
||||
* @retval lsn
|
||||
*/
|
||||
uint64_t
|
||||
CreatePartition(const std::string& collection_id, const std::string& partition_tag);
|
||||
|
||||
/*
|
||||
* Create hybrid collection
|
||||
* @param collection_id: collection id
|
||||
|
@ -87,13 +97,49 @@ class WalManager {
|
|||
DropCollection(const std::string& collection_id);
|
||||
|
||||
/*
|
||||
* Collection is flushed
|
||||
* Drop partition
|
||||
* @param collection_id: collection id
|
||||
* @param partition_tag: partition tag
|
||||
* @retval none
|
||||
*/
|
||||
void
|
||||
DropPartition(const std::string& collection_id, const std::string& partition_tag);
|
||||
|
||||
/*
|
||||
* Collection is flushed (update flushed_lsn)
|
||||
* @param collection_id: collection id
|
||||
* @param lsn: flushed lsn
|
||||
*/
|
||||
void
|
||||
CollectionFlushed(const std::string& collection_id, uint64_t lsn);
|
||||
|
||||
/*
|
||||
* Partition is flushed (update flushed_lsn)
|
||||
* @param collection_id: collection id
|
||||
* @param partition_tag: partition_tag
|
||||
* @param lsn: flushed lsn
|
||||
*/
|
||||
void
|
||||
PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
|
||||
|
||||
/*
|
||||
* Collection is updated (update wal_lsn)
|
||||
* @param collection_id: collection id
|
||||
* @param partition_tag: partition_tag
|
||||
* @param lsn: flushed lsn
|
||||
*/
|
||||
void
|
||||
CollectionUpdated(const std::string& collection_id, uint64_t lsn);
|
||||
|
||||
/*
|
||||
* Partition is updated (update wal_lsn)
|
||||
* @param collection_id: collection id
|
||||
* @param partition_tag: partition_tag
|
||||
* @param lsn: flushed lsn
|
||||
*/
|
||||
void
|
||||
PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
|
||||
|
||||
/*
|
||||
* Insert
|
||||
* @param collection_id: collection id
|
||||
|
@ -155,7 +201,7 @@ class WalManager {
|
|||
uint64_t wal_lsn;
|
||||
};
|
||||
std::mutex mutex_;
|
||||
std::map<std::string, TableLsn> tables_;
|
||||
std::map<std::string, std::map<std::string, TableLsn>> collections_;
|
||||
std::atomic<uint64_t> last_applied_lsn_;
|
||||
|
||||
// if multi-thread call Flush(), use list
|
||||
|
|
|
@ -60,7 +60,7 @@ class TestWalMeta : public SqliteMetaImpl {
|
|||
}
|
||||
|
||||
Status
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array) override {
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root) override {
|
||||
table_schema_array = tables_;
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ class TestWalMetaError : public SqliteMetaImpl {
|
|||
}
|
||||
|
||||
Status
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array) override {
|
||||
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root) override {
|
||||
return Status(DB_ERROR, "error");
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue