diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index bc448c78ac..798aae483c 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -66,15 +66,6 @@ DBImpl::DBImpl(const DBOptions& options) mem_mgr_ = MemManagerFactory::Build(options_); merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_); - if (options_.wal_enable_) { - // wal::MXLogConfiguration mxlog_config; - // mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_; - // // 2 buffers in the WAL - // mxlog_config.buffer_size = options_.buffer_size_ / 2; - // mxlog_config.mxlog_path = options_.mxlog_path_; - // wal_mgr_ = std::make_shared(mxlog_config); - } - /* watch on storage.auto_flush_interval */ ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this); @@ -112,41 +103,10 @@ DBImpl::Start() { // TODO: merge files - // wal - if (options_.wal_enable_) { - return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented"); - // auto error_code = DB_ERROR; - // if (wal_mgr_ != nullptr) { - // error_code = wal_mgr_->Init(); - // } - // if (error_code != WAL_SUCCESS) { - // throw Exception(error_code, "Wal init error!"); - // } - // - // // recovery - // while (true) { - // wal::MXLogRecord record; - // auto error_code = wal_mgr_->GetNextRecovery(record); - // if (error_code != WAL_SUCCESS) { - // throw Exception(error_code, "Wal recovery error!"); - // } - // if (record.type == wal::MXLogType::None) { - // break; - // } - // ExecWalRecord(record); - // } - // - // // for distribute version, some nodes are read only - // if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - // // background wal thread - // bg_wal_thread_ = std::thread(&SSDBImpl::TimingWalThread, this); - // } - } else { - // for distribute version, some nodes are read only - if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - // background flush thread - bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this); - } + // for distribute version, some nodes are read only + if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { + // background flush thread + bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this); } // for distribute version, some nodes are read only @@ -173,20 +133,14 @@ DBImpl::Stop() { initialized_.store(false, std::memory_order_release); if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - if (options_.wal_enable_) { - // // wait wal thread finish - // swn_wal_.Notify(); - // bg_wal_thread_.join(); - } else { - // flush all without merge - wal::MXLogRecord record; - record.type = wal::MXLogType::Flush; - ExecWalRecord(record); + // flush all without merge + wal::MXLogRecord record; + record.type = wal::MXLogType::Flush; + ExecWalRecord(record); - // wait flush thread finish - swn_flush_.Notify(); - bg_flush_thread_.join(); - } + // wait flush thread finish + swn_flush_.Notify(); + bg_flush_thread_.join(); WaitMergeFileFinish(); @@ -212,36 +166,35 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) { CHECK_INITIALIZED; auto ctx = context; - // check uid params - bool has_uid = false; + + // default id is auto-generated + auto params = ctx.collection->GetParams(); + if (params.find(PARAM_UID_AUTOGEN) == params.end()) { + params[PARAM_UID_AUTOGEN] = true; + ctx.collection->SetParams(params); + } + + // check uid existence + snapshot::FieldPtr uid_field; for (auto& pair : ctx.fields_schema) { if (pair.first->GetName() == DEFAULT_UID_NAME) { - has_uid = true; - json params = pair.first->GetParams(); - if (params.find(PARAM_UID_AUTOGEN) == params.end()) { - params[PARAM_UID_AUTOGEN] = true; - pair.first->SetParams(params); - } + uid_field = pair.first; break; } } // add uid field if not specified - if (!has_uid) { - json params; - params[PARAM_UID_AUTOGEN] = true; - auto uid_field = std::make_shared(DEFAULT_UID_NAME, 0, DataType::INT64, params); - auto bloom_filter_element = std::make_shared( - 0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER); - auto delete_doc_element = std::make_shared( - 0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS); - - ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element}; + if (uid_field == nullptr) { + uid_field = std::make_shared(DEFAULT_UID_NAME, 0, DataType::INT64); } - if (options_.wal_enable_) { - // ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName()); - } + // define uid elements + auto bloom_filter_element = std::make_shared( + 0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER); + auto delete_doc_element = std::make_shared( + 0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS); + ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element}; + auto op = std::make_shared(ctx); return op->Push(); } @@ -256,11 +209,6 @@ DBImpl::DropCollection(const std::string& name) { auto& snapshots = snapshot::Snapshots::GetInstance(); STATUS_CHECK(snapshots.GetSnapshot(ss, name)); - if (options_.wal_enable_) { - // SS TODO - /* wal_mgr_->DropCollection(ss->GetCollectionId()); */ - } - mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); @@ -328,11 +276,6 @@ DBImpl::CreatePartition(const std::string& collection_name, const std::string& p STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); snapshot::LSN_TYPE lsn = 0; - if (options_.wal_enable_) { - // SS TODO - /* lsn = wal_mgr_->CreatePartition(collection_name, partition_tag); */ - } - snapshot::OperationContext context; context.lsn = lsn; auto op = std::make_shared(context, ss); @@ -492,24 +435,21 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ return Status(DB_ERROR, "Field '_id' not found"); } - auto& params = id_field->GetParams(); + auto& params = ss->GetCollection()->GetParams(); bool auto_increment = true; if (params.find(PARAM_UID_AUTOGEN) != params.end()) { auto_increment = params[PARAM_UID_AUTOGEN]; } - // id is auto increment, but client provides id, return error FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_; + auto pair = fields.find(engine::DEFAULT_UID_NAME); if (auto_increment) { - auto pair = fields.find(engine::DEFAULT_UID_NAME); + // id is auto increment, but client provides id, return error if (pair != fields.end() && pair->second != nullptr) { return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id"); } - } - - // id is not auto increment, but client doesn't provide id, return error - if (!auto_increment) { - auto pair = fields.find(engine::DEFAULT_UID_NAME); + } else { + // id is not auto increment, but client doesn't provide id, return error if (pair == fields.end() || pair->second == nullptr) { return Status(DB_ERROR, "Field '_id' is user defined"); } @@ -526,31 +466,16 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data; } - if (options_.wal_enable_) { - return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented"); - // auto vector_it = entity.vector_data_.begin(); - // if (!vector_it->second.binary_data_.empty()) { - // wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_, - // vector_it->second.binary_data_, - // attr_nbytes, attr_data); - // } else if (!vector_it->second.float_data_.empty()) { - // wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_, - // vector_it->second.float_data_, - // attr_nbytes, attr_data); - // } - // swn_wal_.Notify(); - } else { - // insert entities: collection_name is field id - wal::MXLogRecord record; - record.lsn = 0; - record.collection_id = collection_name; - record.partition_tag = partition_name; - record.data_chunk = data_chunk; - record.length = data_chunk->count_; - record.type = wal::MXLogType::Entity; + // insert entities: collection_name is field id + wal::MXLogRecord record; + record.lsn = 0; + record.collection_id = collection_name; + record.partition_tag = partition_name; + record.data_chunk = data_chunk; + record.length = data_chunk->count_; + record.type = wal::MXLogType::Entity; - STATUS_CHECK(ExecWalRecord(record)); - } + STATUS_CHECK(ExecWalRecord(record)); return Status::OK(); } @@ -580,20 +505,14 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum CHECK_INITIALIZED; Status status; - if (options_.wal_enable_) { - return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented"); - // wal_mgr_->DeleteById(collection_name, entity_ids); - // swn_wal_.Notify(); - } else { - wal::MXLogRecord record; - record.lsn = 0; // need to get from meta ? - record.type = wal::MXLogType::Delete; - record.collection_id = collection_name; - record.ids = entity_ids.data(); - record.length = entity_ids.size(); + wal::MXLogRecord record; + record.lsn = 0; // need to get from meta ? + record.type = wal::MXLogType::Delete; + record.collection_id = collection_name; + record.ids = entity_ids.data(); + record.length = entity_ids.size(); - status = ExecWalRecord(record); - } + status = ExecWalRecord(record); return status; } @@ -748,24 +667,7 @@ DBImpl::Flush(const std::string& collection_name) { } LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name; - - if (options_.wal_enable_) { - return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented"); - // LOG_ENGINE_DEBUG_ << "WAL flush"; - // auto lsn = wal_mgr_->Flush(collection_name); - // if (lsn != 0) { - // swn_wal_.Notify(); - // flush_req_swn_.Wait(); - // } else { - // // no collection flushed, call merge task to cleanup files - // std::set merge_collection_names; - // StartMergeTask(merge_collection_names); - // } - } else { - LOG_ENGINE_DEBUG_ << "MemTable flush"; - InternalFlush(collection_name); - } - + InternalFlush(collection_name); LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name; return status; @@ -778,29 +680,10 @@ DBImpl::Flush() { } LOG_ENGINE_DEBUG_ << "Begin flush all collections"; - - Status status; - fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false); - if (options_.wal_enable_) { - return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented"); - // LOG_ENGINE_DEBUG_ << "WAL flush"; - // auto lsn = wal_mgr_->Flush(); - // if (lsn != 0) { - // swn_wal_.Notify(); - // flush_req_swn_.Wait(); - // } else { - // // no collection flushed, call merge task to cleanup files - // std::set merge_collection_names; - // StartMergeTask(merge_collection_names); - // } - } else { - LOG_ENGINE_DEBUG_ << "MemTable flush"; - InternalFlush(); - } - + InternalFlush(); LOG_ENGINE_DEBUG_ << "End flush all collections"; - return status; + return Status::OK(); } Status @@ -1099,30 +982,6 @@ DBImpl::TimingWalThread() { Status DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { - auto collections_flushed = [&](const std::string& collection_name, - const std::set& target_collection_names) -> uint64_t { - uint64_t max_lsn = 0; - if (options_.wal_enable_ && !target_collection_names.empty()) { - // uint64_t lsn = 0; - // for (auto& collection_name : target_collection_names) { - // snapshot::ScopedSnapshotT ss; - // snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - // lsn = ss->GetMaxLsn(); - // if (lsn > max_lsn) { - // max_lsn = lsn; - // } - // } - // wal_mgr_->CollectionFlushed(collection_name, lsn); - } - - std::set merge_collection_names; - for (auto& collection : target_collection_names) { - merge_collection_names.insert(collection); - } - StartMergeTask(merge_collection_names); - return max_lsn; - }; - auto force_flush_if_mem_full = [&]() -> void { if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) { LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush"; @@ -1201,14 +1060,15 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { } const std::lock_guard lock(flush_merge_compact_mutex_); - int64_t collection_name = ss->GetCollectionId(); - status = mem_mgr_->Flush(collection_name); + int64_t collection_id = ss->GetCollectionId(); + status = mem_mgr_->Flush(collection_id); if (!status.ok()) { return status; } std::set flushed_collections; - collections_flushed(record.collection_id, flushed_collections); + flushed_collections.insert(record.collection_id); + StartMergeTask(flushed_collections); } else { // flush all collections @@ -1230,10 +1090,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { flushed_collections.insert(ss->GetName()); } - uint64_t lsn = collections_flushed("", flushed_collections); - if (options_.wal_enable_) { - // wal_mgr_->RemoveOldFiles(lsn); - } + StartMergeTask(flushed_collections); } break; } diff --git a/core/src/db/Types.cpp b/core/src/db/Types.cpp index 31677597bb..636c1e0086 100644 --- a/core/src/db/Types.cpp +++ b/core/src/db/Types.cpp @@ -16,7 +16,7 @@ namespace milvus { namespace engine { const char* DEFAULT_UID_NAME = "_id"; -const char* PARAM_UID_AUTOGEN = "auto_increment"; +const char* PARAM_UID_AUTOGEN = "auto_id"; const char* DEFAULT_RAW_DATA_NAME = "_raw"; const char* DEFAULT_BLOOM_FILTER_NAME = "_blf"; diff --git a/core/src/grpc/milvus.proto b/core/src/grpc/milvus.proto index 39b6ed275a..f51719c7f4 100644 --- a/core/src/grpc/milvus.proto +++ b/core/src/grpc/milvus.proto @@ -57,6 +57,12 @@ message FieldName { /** * @brief Collection mapping + * @extra_params: key-value pair for extra parameters of the collection + * typically usage: + * extra_params["params"] = {segment_row_count: 1000000, auto_id: true} + * Note: + * the segment_row_count specify segment row count limit for merging + * the auto_id = true means entity id is auto-generated by milvus */ message Mapping { Status status = 1; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 60db8c5aac..49f3f6742c 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -471,27 +471,73 @@ TEST_F(DBTest, QueryTest) { } TEST_F(DBTest, InsertTest) { - std::string collection_name = "MERGE_TEST"; - auto status = CreateCollection2(db_, collection_name, 0); - ASSERT_TRUE(status.ok()); + auto do_insert = [&](bool autogen_id, bool provide_id) -> void { + CreateCollectionContext context; + context.lsn = 0; + std::string collection_name = "INSERT_TEST"; + auto collection_schema = std::make_shared(collection_name); + milvus::json params; + params[milvus::engine::PARAM_UID_AUTOGEN] = autogen_id; + collection_schema->SetParams(params); + context.collection = collection_schema; - status = db_->Flush(); - ASSERT_TRUE(status.ok()); + std::string field_name = "field_0"; + auto field = std::make_shared(field_name, 0, milvus::engine::DataType::INT32); + context.fields_schema[field] = {}; - const uint64_t entity_count = 100; - milvus::engine::DataChunkPtr data_chunk; - BuildEntities(entity_count, 0, data_chunk); + field = std::make_shared(milvus::engine::DEFAULT_UID_NAME, 0, milvus::engine::DataType::INT64); + context.fields_schema[field] = {}; - status = db_->Insert(collection_name, "", data_chunk); - ASSERT_TRUE(status.ok()); + auto status = db_->CreateCollection(context); - status = db_->Flush(); - ASSERT_TRUE(status.ok()); + milvus::engine::DataChunkPtr data_chunk = std::make_shared(); + data_chunk->count_ = 100; + if (provide_id) { + milvus::engine::BinaryDataPtr raw = std::make_shared(); + raw->data_.resize(100 * sizeof(int64_t)); + int64_t* p = (int64_t*)raw->data_.data(); + for (auto i = 0; i < data_chunk->count_; ++i) { + p[i] = i; + } + data_chunk->fixed_fields_[milvus::engine::DEFAULT_UID_NAME] = raw; + } + { + milvus::engine::BinaryDataPtr raw = std::make_shared(); + raw->data_.resize(100 * sizeof(int32_t)); + int32_t* p = (int32_t*)raw->data_.data(); + for (auto i = 0; i < data_chunk->count_; ++i) { + p[i] = i + 5000; + } + data_chunk->fixed_fields_[field_name] = raw; + } - int64_t row_count = 0; - status = db_->CountEntities(collection_name, row_count); - ASSERT_TRUE(status.ok()); - ASSERT_EQ(row_count, entity_count); + status = db_->Insert(collection_name, "", data_chunk); + if (autogen_id == provide_id) { + ASSERT_FALSE(status.ok()); + } else { + ASSERT_TRUE(status.ok()); + } + + status = db_->Flush(); + ASSERT_TRUE(status.ok()); + + int64_t row_count = 0; + status = db_->CountEntities(collection_name, row_count); + ASSERT_TRUE(status.ok()); + if (autogen_id == provide_id) { + ASSERT_EQ(row_count, 0); + } else { + ASSERT_EQ(row_count, data_chunk->count_); + } + + status = db_->DropCollection(collection_name); + ASSERT_TRUE(status.ok()); + }; + + do_insert(true, true); + do_insert(true, false); + do_insert(false, true); + do_insert(false, false); } TEST_F(DBTest, MergeTest) {