fix auto-generated id field bug (#3137)

* autogen id

Signed-off-by: yhmo <yihua.mo@zilliz.com>

* fix bug

Signed-off-by: yhmo <yihua.mo@zilliz.com>

Co-authored-by: Wang Xiangyu <xy.wang@zilliz.com>
pull/3142/head^2
groot 2020-08-05 18:19:51 +08:00 committed by GitHub
parent e4a70abec3
commit 2b2ebc27e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 129 additions and 220 deletions

View File

@ -66,15 +66,6 @@ DBImpl::DBImpl(const DBOptions& options)
mem_mgr_ = MemManagerFactory::Build(options_);
merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
if (options_.wal_enable_) {
// wal::MXLogConfiguration mxlog_config;
// mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
// // 2 buffers in the WAL
// mxlog_config.buffer_size = options_.buffer_size_ / 2;
// mxlog_config.mxlog_path = options_.mxlog_path_;
// wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
}
/* watch on storage.auto_flush_interval */
ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);
@ -112,41 +103,10 @@ DBImpl::Start() {
// TODO: merge files
// wal
if (options_.wal_enable_) {
return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
// auto error_code = DB_ERROR;
// if (wal_mgr_ != nullptr) {
// error_code = wal_mgr_->Init();
// }
// if (error_code != WAL_SUCCESS) {
// throw Exception(error_code, "Wal init error!");
// }
//
// // recovery
// while (true) {
// wal::MXLogRecord record;
// auto error_code = wal_mgr_->GetNextRecovery(record);
// if (error_code != WAL_SUCCESS) {
// throw Exception(error_code, "Wal recovery error!");
// }
// if (record.type == wal::MXLogType::None) {
// break;
// }
// ExecWalRecord(record);
// }
//
// // for distribute version, some nodes are read only
// if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// // background wal thread
// bg_wal_thread_ = std::thread(&SSDBImpl::TimingWalThread, this);
// }
} else {
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// background flush thread
bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
}
// for distribute version, some nodes are read only
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// background flush thread
bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
}
// for distribute version, some nodes are read only
@ -173,20 +133,14 @@ DBImpl::Stop() {
initialized_.store(false, std::memory_order_release);
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
if (options_.wal_enable_) {
// // wait wal thread finish
// swn_wal_.Notify();
// bg_wal_thread_.join();
} else {
// flush all without merge
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
ExecWalRecord(record);
// flush all without merge
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
ExecWalRecord(record);
// wait flush thread finish
swn_flush_.Notify();
bg_flush_thread_.join();
}
// wait flush thread finish
swn_flush_.Notify();
bg_flush_thread_.join();
WaitMergeFileFinish();
@ -212,36 +166,35 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_INITIALIZED;
auto ctx = context;
// check uid params
bool has_uid = false;
// default id is auto-generated
auto params = ctx.collection->GetParams();
if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
params[PARAM_UID_AUTOGEN] = true;
ctx.collection->SetParams(params);
}
// check uid existence
snapshot::FieldPtr uid_field;
for (auto& pair : ctx.fields_schema) {
if (pair.first->GetName() == DEFAULT_UID_NAME) {
has_uid = true;
json params = pair.first->GetParams();
if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
params[PARAM_UID_AUTOGEN] = true;
pair.first->SetParams(params);
}
uid_field = pair.first;
break;
}
}
// add uid field if not specified
if (!has_uid) {
json params;
params[PARAM_UID_AUTOGEN] = true;
auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, DataType::INT64, params);
auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS);
ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
if (uid_field == nullptr) {
uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, DataType::INT64);
}
if (options_.wal_enable_) {
// ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
}
// define uid elements
auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS);
ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
return op->Push();
}
@ -256,11 +209,6 @@ DBImpl::DropCollection(const std::string& name) {
auto& snapshots = snapshot::Snapshots::GetInstance();
STATUS_CHECK(snapshots.GetSnapshot(ss, name));
if (options_.wal_enable_) {
// SS TODO
/* wal_mgr_->DropCollection(ss->GetCollectionId()); */
}
mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert
return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
@ -328,11 +276,6 @@ DBImpl::CreatePartition(const std::string& collection_name, const std::string& p
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
snapshot::LSN_TYPE lsn = 0;
if (options_.wal_enable_) {
// SS TODO
/* lsn = wal_mgr_->CreatePartition(collection_name, partition_tag); */
}
snapshot::OperationContext context;
context.lsn = lsn;
auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);
@ -492,24 +435,21 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
return Status(DB_ERROR, "Field '_id' not found");
}
auto& params = id_field->GetParams();
auto& params = ss->GetCollection()->GetParams();
bool auto_increment = true;
if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
auto_increment = params[PARAM_UID_AUTOGEN];
}
// id is auto increment, but client provides id, return error
FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
auto pair = fields.find(engine::DEFAULT_UID_NAME);
if (auto_increment) {
auto pair = fields.find(engine::DEFAULT_UID_NAME);
// id is auto increment, but client provides id, return error
if (pair != fields.end() && pair->second != nullptr) {
return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
}
}
// id is not auto increment, but client doesn't provide id, return error
if (!auto_increment) {
auto pair = fields.find(engine::DEFAULT_UID_NAME);
} else {
// id is not auto increment, but client doesn't provide id, return error
if (pair == fields.end() || pair->second == nullptr) {
return Status(DB_ERROR, "Field '_id' is user defined");
}
@ -526,31 +466,16 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data;
}
if (options_.wal_enable_) {
return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
// auto vector_it = entity.vector_data_.begin();
// if (!vector_it->second.binary_data_.empty()) {
// wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
// vector_it->second.binary_data_,
// attr_nbytes, attr_data);
// } else if (!vector_it->second.float_data_.empty()) {
// wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
// vector_it->second.float_data_,
// attr_nbytes, attr_data);
// }
// swn_wal_.Notify();
} else {
// insert entities: collection_name is field id
wal::MXLogRecord record;
record.lsn = 0;
record.collection_id = collection_name;
record.partition_tag = partition_name;
record.data_chunk = data_chunk;
record.length = data_chunk->count_;
record.type = wal::MXLogType::Entity;
// insert entities: collection_name is field id
wal::MXLogRecord record;
record.lsn = 0;
record.collection_id = collection_name;
record.partition_tag = partition_name;
record.data_chunk = data_chunk;
record.length = data_chunk->count_;
record.type = wal::MXLogType::Entity;
STATUS_CHECK(ExecWalRecord(record));
}
STATUS_CHECK(ExecWalRecord(record));
return Status::OK();
}
@ -580,20 +505,14 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum
CHECK_INITIALIZED;
Status status;
if (options_.wal_enable_) {
return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
// wal_mgr_->DeleteById(collection_name, entity_ids);
// swn_wal_.Notify();
} else {
wal::MXLogRecord record;
record.lsn = 0; // need to get from meta ?
record.type = wal::MXLogType::Delete;
record.collection_id = collection_name;
record.ids = entity_ids.data();
record.length = entity_ids.size();
wal::MXLogRecord record;
record.lsn = 0; // need to get from meta ?
record.type = wal::MXLogType::Delete;
record.collection_id = collection_name;
record.ids = entity_ids.data();
record.length = entity_ids.size();
status = ExecWalRecord(record);
}
status = ExecWalRecord(record);
return status;
}
@ -748,24 +667,7 @@ DBImpl::Flush(const std::string& collection_name) {
}
LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;
if (options_.wal_enable_) {
return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
// LOG_ENGINE_DEBUG_ << "WAL flush";
// auto lsn = wal_mgr_->Flush(collection_name);
// if (lsn != 0) {
// swn_wal_.Notify();
// flush_req_swn_.Wait();
// } else {
// // no collection flushed, call merge task to cleanup files
// std::set<std::string> merge_collection_names;
// StartMergeTask(merge_collection_names);
// }
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush(collection_name);
}
InternalFlush(collection_name);
LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;
return status;
@ -778,29 +680,10 @@ DBImpl::Flush() {
}
LOG_ENGINE_DEBUG_ << "Begin flush all collections";
Status status;
fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
if (options_.wal_enable_) {
return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
// LOG_ENGINE_DEBUG_ << "WAL flush";
// auto lsn = wal_mgr_->Flush();
// if (lsn != 0) {
// swn_wal_.Notify();
// flush_req_swn_.Wait();
// } else {
// // no collection flushed, call merge task to cleanup files
// std::set<std::string> merge_collection_names;
// StartMergeTask(merge_collection_names);
// }
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush();
}
InternalFlush();
LOG_ENGINE_DEBUG_ << "End flush all collections";
return status;
return Status::OK();
}
Status
@ -1099,30 +982,6 @@ DBImpl::TimingWalThread() {
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
auto collections_flushed = [&](const std::string& collection_name,
const std::set<std::string>& target_collection_names) -> uint64_t {
uint64_t max_lsn = 0;
if (options_.wal_enable_ && !target_collection_names.empty()) {
// uint64_t lsn = 0;
// for (auto& collection_name : target_collection_names) {
// snapshot::ScopedSnapshotT ss;
// snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
// lsn = ss->GetMaxLsn();
// if (lsn > max_lsn) {
// max_lsn = lsn;
// }
// }
// wal_mgr_->CollectionFlushed(collection_name, lsn);
}
std::set<std::string> merge_collection_names;
for (auto& collection : target_collection_names) {
merge_collection_names.insert(collection);
}
StartMergeTask(merge_collection_names);
return max_lsn;
};
auto force_flush_if_mem_full = [&]() -> void {
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
@ -1201,14 +1060,15 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
}
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
int64_t collection_name = ss->GetCollectionId();
status = mem_mgr_->Flush(collection_name);
int64_t collection_id = ss->GetCollectionId();
status = mem_mgr_->Flush(collection_id);
if (!status.ok()) {
return status;
}
std::set<std::string> flushed_collections;
collections_flushed(record.collection_id, flushed_collections);
flushed_collections.insert(record.collection_id);
StartMergeTask(flushed_collections);
} else {
// flush all collections
@ -1230,10 +1090,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
flushed_collections.insert(ss->GetName());
}
uint64_t lsn = collections_flushed("", flushed_collections);
if (options_.wal_enable_) {
// wal_mgr_->RemoveOldFiles(lsn);
}
StartMergeTask(flushed_collections);
}
break;
}

View File

@ -16,7 +16,7 @@ namespace milvus {
namespace engine {
const char* DEFAULT_UID_NAME = "_id";
const char* PARAM_UID_AUTOGEN = "auto_increment";
const char* PARAM_UID_AUTOGEN = "auto_id";
const char* DEFAULT_RAW_DATA_NAME = "_raw";
const char* DEFAULT_BLOOM_FILTER_NAME = "_blf";

View File

@ -57,6 +57,12 @@ message FieldName {
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;

View File

@ -471,27 +471,73 @@ TEST_F(DBTest, QueryTest) {
}
TEST_F(DBTest, InsertTest) {
std::string collection_name = "MERGE_TEST";
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok());
auto do_insert = [&](bool autogen_id, bool provide_id) -> void {
CreateCollectionContext context;
context.lsn = 0;
std::string collection_name = "INSERT_TEST";
auto collection_schema = std::make_shared<Collection>(collection_name);
milvus::json params;
params[milvus::engine::PARAM_UID_AUTOGEN] = autogen_id;
collection_schema->SetParams(params);
context.collection = collection_schema;
status = db_->Flush();
ASSERT_TRUE(status.ok());
std::string field_name = "field_0";
auto field = std::make_shared<Field>(field_name, 0, milvus::engine::DataType::INT32);
context.fields_schema[field] = {};
const uint64_t entity_count = 100;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
field = std::make_shared<Field>(milvus::engine::DEFAULT_UID_NAME, 0, milvus::engine::DataType::INT64);
context.fields_schema[field] = {};
status = db_->Insert(collection_name, "", data_chunk);
ASSERT_TRUE(status.ok());
auto status = db_->CreateCollection(context);
status = db_->Flush();
ASSERT_TRUE(status.ok());
milvus::engine::DataChunkPtr data_chunk = std::make_shared<milvus::engine::DataChunk>();
data_chunk->count_ = 100;
if (provide_id) {
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(100 * sizeof(int64_t));
int64_t* p = (int64_t*)raw->data_.data();
for (auto i = 0; i < data_chunk->count_; ++i) {
p[i] = i;
}
data_chunk->fixed_fields_[milvus::engine::DEFAULT_UID_NAME] = raw;
}
{
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
raw->data_.resize(100 * sizeof(int32_t));
int32_t* p = (int32_t*)raw->data_.data();
for (auto i = 0; i < data_chunk->count_; ++i) {
p[i] = i + 5000;
}
data_chunk->fixed_fields_[field_name] = raw;
}
int64_t row_count = 0;
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count);
status = db_->Insert(collection_name, "", data_chunk);
if (autogen_id == provide_id) {
ASSERT_FALSE(status.ok());
} else {
ASSERT_TRUE(status.ok());
}
status = db_->Flush();
ASSERT_TRUE(status.ok());
int64_t row_count = 0;
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
if (autogen_id == provide_id) {
ASSERT_EQ(row_count, 0);
} else {
ASSERT_EQ(row_count, data_chunk->count_);
}
status = db_->DropCollection(collection_name);
ASSERT_TRUE(status.ok());
};
do_insert(true, true);
do_insert(true, false);
do_insert(false, true);
do_insert(false, false);
}
TEST_F(DBTest, MergeTest) {