Merge branch 'master' into caiyd_codec_opt

pull/1538/head
Cai Yudong 2020-03-07 14:38:57 +08:00 committed by GitHub
commit 692986760b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 459 additions and 118 deletions

View File

@ -25,19 +25,22 @@ Please mark all change in change log and use the issue from GitHub
- \#1152 Error log output continuously after server start
- \#1211 Server down caused by searching with index_type: HNSW
- \#1240 Update license declaration
- \#1298 Unittest failed when on CPU2GPU case
- \#1298 Unit test failed when on CPU2GPU case
- \#1359 Negative distance value returned when searching with HNSW index type
- \#1429 Server crashed when searching vectors using GPU
- \#1429 Server crashed when searching vectors with GPU
- \#1476 Fix vectors results bug when getting vectors from segments
- \#1484 Index type changed to IDMAP after compacted
- \#1499 Fix duplicated ID number issue
- \#1491 Server crashed during adding vectors
- \#1504 Avoid possible race condition between delete and search
- \#1507 set_config for insert_buffer_size is wrong
- \#1510 Add set interfaces for WAL configurations
- \#1511 Fix big integer cannot pass to server correctly
- \#1518 Table count did not match after deleting vectors and compact
- \#1530 Set table file with correct engine type in meta
- \#1521 Make cache_insert_data take effect in-service
- \#1525 Add setter API for config preload_table
- \#1530 Set table file with correct engine type in meta
- \#1535 Degradation searching performance with metric_type: binary_idmap
## Feature
- \#216 Add CLI to get server info
@ -77,13 +80,14 @@ Please mark all change in change log and use the issue from GitHub
- \#1105 Error message is not clear when creating IVFSQ8H index without gpu resources
- \#740, #849, #878, #972, #1033, #1161, #1173, #1199, #1190, #1223, #1222, #1257, #1264, #1269, #1164, #1303, #1304, #1324, #1388, #1459 Various fixes and improvements for Milvus documentation.
- \#1297 Hide partition_name parameter, avid user directly access partition table
- \#1310 Add default partition tag for a table
- \#1234 Do S3 server validation check when Milvus startup
- \#1263 Allow system conf modifiable and some take effect directly
- \#1310 Add default partition tag for a table
- \#1320 Remove debug logging from faiss
- \#1426 Support to configure whether to enabled autoflush and the autoflush interval
- \#1444 Improve delete
- \#1480 Add return code for AVX512 selection
- \#1524 Update config "preload_table" description
- \#1537 Optimize raw vector and uids read/write
## Task

View File

@ -44,7 +44,8 @@ server_config:
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |

View File

@ -44,7 +44,8 @@ server_config:
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |

View File

@ -44,7 +44,8 @@ server_config:
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |

View File

@ -0,0 +1,114 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "config/handler/CacheConfigHandler.h"
#include "server/Config.h"
namespace milvus {
namespace server {
CacheConfigHandler::CacheConfigHandler() {
auto& config = Config::GetInstance();
config.GetCacheConfigCpuCacheCapacity(cpu_cache_capacity_);
config.GetCacheConfigInsertBufferSize(insert_buffer_size_);
config.GetCacheConfigCacheInsertData(cache_insert_data_);
}
CacheConfigHandler::~CacheConfigHandler() {
RemoveCpuCacheCapacityListener();
RemoveInsertBufferSizeListener();
RemoveCacheInsertDataListener();
}
void
CacheConfigHandler::OnCpuCacheCapacityChanged(int64_t value) {
cpu_cache_capacity_ = value;
}
void
CacheConfigHandler::OnInsertBufferSizeChanged(int64_t value) {
insert_buffer_size_ = value;
}
void
CacheConfigHandler::OnCacheInsertDataChanged(bool value) {
cache_insert_data_ = value;
}
void
CacheConfigHandler::AddCpuCacheCapacityListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t capacity;
auto status = config.GetCacheConfigCpuCacheCapacity(capacity);
if (status.ok()) {
OnCpuCacheCapacityChanged(capacity);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CPU_CACHE_CAPACITY, identity_, lambda);
}
void
CacheConfigHandler::AddInsertBufferSizeListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t size;
auto status = config.GetCacheConfigInsertBufferSize(size);
if (status.ok()) {
OnInsertBufferSizeChanged(size);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_, lambda);
}
void
CacheConfigHandler::AddCacheInsertDataListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
auto& config = server::Config::GetInstance();
bool ok;
auto status = config.GetCacheConfigCacheInsertData(ok);
if (status.ok()) {
OnCacheInsertDataChanged(ok);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CACHE_INSERT_DATA, identity_, lambda);
}
void
CacheConfigHandler::RemoveCpuCacheCapacityListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CPU_CACHE_CAPACITY, identity_);
}
void
CacheConfigHandler::RemoveInsertBufferSizeListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_);
}
void
CacheConfigHandler::RemoveCacheInsertDataListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CACHE_INSERT_DATA, identity_);
}
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,61 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "config/handler/ConfigHandler.h"
namespace milvus {
namespace server {
class CacheConfigHandler : virtual public ConfigHandler {
public:
CacheConfigHandler();
~CacheConfigHandler();
protected:
virtual void
OnCpuCacheCapacityChanged(int64_t value);
virtual void
OnInsertBufferSizeChanged(int64_t value);
virtual void
OnCacheInsertDataChanged(bool value);
protected:
void
AddCpuCacheCapacityListener();
void
AddInsertBufferSizeListener();
void
AddCacheInsertDataListener();
void
RemoveCpuCacheCapacityListener();
void
RemoveInsertBufferSizeListener();
void
RemoveCacheInsertDataListener();
private:
int64_t cpu_cache_capacity_ = 4 /*GiB*/;
int64_t insert_buffer_size_ = 1 /*GiB*/;
bool cache_insert_data_ = false;
};
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,35 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace server {
class ConfigHandler {
protected:
void
SetIdentity(const std::string& identity) {
auto& config = server::Config::GetInstance();
config.GenUniqueIdentityID(identity, identity_);
}
protected:
std::string identity_;
};
} // namespace server
} // namespace milvus

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuBuildResHandler.h"
#include "config/handler/GpuBuildConfigHandler.h"
#include <string>
#include <vector>
@ -17,24 +17,23 @@
namespace milvus {
namespace server {
GpuBuildResHandler::GpuBuildResHandler() {
GpuBuildConfigHandler::GpuBuildConfigHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigBuildIndexResources(build_gpus_);
}
GpuBuildResHandler::~GpuBuildResHandler() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_);
GpuBuildConfigHandler::~GpuBuildConfigHandler() {
RemoveGpuBuildResListener();
}
////////////////////////////////////////////////////////////////
void
GpuBuildResHandler::OnGpuBuildResChanged(const std::vector<int64_t>& gpus) {
GpuBuildConfigHandler::OnGpuBuildResChanged(const std::vector<int64_t>& gpus) {
build_gpus_ = gpus;
}
void
GpuBuildResHandler::AddGpuBuildResListener() {
GpuBuildConfigHandler::AddGpuBuildResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
@ -51,7 +50,7 @@ GpuBuildResHandler::AddGpuBuildResListener() {
}
void
GpuBuildResHandler::RemoveGpuBuildResListener() {
GpuBuildConfigHandler::RemoveGpuBuildResListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_);
}

View File

@ -13,16 +13,16 @@
#include <vector>
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
class GpuBuildResHandler : virtual public GpuResourcesHandler {
class GpuBuildConfigHandler : virtual public GpuConfigHandler {
public:
GpuBuildResHandler();
GpuBuildConfigHandler();
~GpuBuildResHandler();
~GpuBuildConfigHandler();
public:
virtual void

View File

@ -10,38 +10,32 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
GpuResourcesHandler::GpuResourcesHandler() {
GpuConfigHandler::GpuConfigHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigEnable(gpu_enable_);
}
GpuResourcesHandler::~GpuResourcesHandler() {
GpuConfigHandler::~GpuConfigHandler() {
RemoveGpuEnableListener();
}
//////////////////////////////////////////////////////////////
void
GpuResourcesHandler::OnGpuEnableChanged(bool enable) {
GpuConfigHandler::OnGpuEnableChanged(bool enable) {
gpu_enable_ = enable;
}
void
GpuResourcesHandler::SetIdentity(const std::string& identity) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID(identity, identity_);
}
void
GpuResourcesHandler::AddGpuEnableListener() {
server::Config& config = server::Config::GetInstance();
GpuConfigHandler::AddGpuEnableListener() {
auto& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
auto& config = server::Config::GetInstance();
bool enable;
auto status = config.GetGpuResourceConfigEnable(enable);
if (status.ok()) {
@ -54,7 +48,7 @@ GpuResourcesHandler::AddGpuEnableListener() {
}
void
GpuResourcesHandler::RemoveGpuEnableListener() {
GpuConfigHandler::RemoveGpuEnableListener() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_);
}

View File

@ -15,25 +15,23 @@
#include <limits>
#include <string>
#include "config/handler/ConfigHandler.h"
#include "server/Config.h"
namespace milvus {
namespace server {
class GpuResourcesHandler {
class GpuConfigHandler : virtual public ConfigHandler {
public:
GpuResourcesHandler();
GpuConfigHandler();
~GpuResourcesHandler();
~GpuConfigHandler();
protected:
virtual void
OnGpuEnableChanged(bool enable);
protected:
void
SetIdentity(const std::string& identity);
void
AddGpuEnableListener();
@ -42,7 +40,6 @@ class GpuResourcesHandler {
protected:
bool gpu_enable_ = true;
std::string identity_;
};
} // namespace server

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include <string>
#include <vector>
@ -19,7 +19,7 @@
namespace milvus {
namespace server {
GpuSearchResHandler::GpuSearchResHandler() {
GpuSearchConfigHandler::GpuSearchConfigHandler() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
@ -30,24 +30,24 @@ GpuSearchResHandler::GpuSearchResHandler() {
config.GetGpuResourceConfigSearchResources(search_gpus_);
}
GpuSearchResHandler::~GpuSearchResHandler() {
GpuSearchConfigHandler::~GpuSearchConfigHandler() {
RemoveGpuSearchThresholdListener();
RemoveGpuSearchResListener();
}
////////////////////////////////////////////////////////////////////////
void
GpuSearchResHandler::OnGpuSearchThresholdChanged(int64_t threshold) {
GpuSearchConfigHandler::OnGpuSearchThresholdChanged(int64_t threshold) {
threshold_ = threshold;
}
void
GpuSearchResHandler::OnGpuSearchResChanged(const std::vector<int64_t>& gpus) {
GpuSearchConfigHandler::OnGpuSearchResChanged(const std::vector<int64_t>& gpus) {
search_gpus_ = gpus;
}
void
GpuSearchResHandler::AddGpuSearchThresholdListener() {
GpuSearchConfigHandler::AddGpuSearchThresholdListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_threshold = [this](const std::string& value) -> Status {
@ -65,7 +65,7 @@ GpuSearchResHandler::AddGpuSearchThresholdListener() {
}
void
GpuSearchResHandler::AddGpuSearchResListener() {
GpuSearchConfigHandler::AddGpuSearchResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_search_res = [this](const std::string& value) -> Status {
@ -83,13 +83,13 @@ GpuSearchResHandler::AddGpuSearchResListener() {
}
void
GpuSearchResHandler::RemoveGpuSearchThresholdListener() {
GpuSearchConfigHandler::RemoveGpuSearchThresholdListener() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_ENGINE, server::CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, identity_);
}
void
GpuSearchResHandler::RemoveGpuSearchResListener() {
GpuSearchConfigHandler::RemoveGpuSearchResListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, identity_);
}

View File

@ -14,16 +14,16 @@
#include <limits>
#include <vector>
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
class GpuSearchResHandler : virtual public GpuResourcesHandler {
class GpuSearchConfigHandler : virtual public GpuConfigHandler {
public:
GpuSearchResHandler();
GpuSearchConfigHandler();
~GpuSearchResHandler();
~GpuSearchConfigHandler();
public:
virtual void

View File

@ -75,10 +75,14 @@ DBImpl::DBImpl(const DBOptions& options)
wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
}
SetIdentity("DBImpl");
AddCacheInsertDataListener();
Start();
}
DBImpl::~DBImpl() {
RemoveCacheInsertDataListener();
Stop();
}
@ -2084,5 +2088,10 @@ DBImpl::BackgroundWalTask() {
}
}
void
DBImpl::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus

View File

@ -22,7 +22,8 @@
#include <thread>
#include <vector>
#include "DB.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/DB.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
@ -37,7 +38,7 @@ namespace meta {
class Meta;
}
class DBImpl : public DB {
class DBImpl : public DB, public server::CacheConfigHandler {
public:
explicit DBImpl(const DBOptions& options);
~DBImpl();
@ -146,6 +147,10 @@ class DBImpl : public DB {
Status
Size(uint64_t& result) override;
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
@ -226,7 +231,7 @@ class DBImpl : public DB {
BackgroundWalTask();
private:
const DBOptions options_;
DBOptions options_;
std::atomic<bool> initialized_;

View File

@ -452,6 +452,10 @@ ExecutionEngineImpl::Load(bool to_cache) {
status = std::static_pointer_cast<BFIndex>(index_)->AddWithoutIds(vectors->GetCount(),
float_vectors.data(), Config());
status = std::static_pointer_cast<BFIndex>(index_)->SetBlacklist(concurrent_bitset_ptr);
int64_t index_size = vectors->GetCount() * conf->d * sizeof(float);
int64_t bitset_size = vectors->GetCount() / 8;
index_->set_size(index_size + bitset_size);
} else if (index_type_ == EngineType::FAISS_BIN_IDMAP) {
ec = std::static_pointer_cast<BinBFIndex>(index_)->Build(conf);
if (ec != KNOWHERE_SUCCESS) {
@ -460,6 +464,10 @@ ExecutionEngineImpl::Load(bool to_cache) {
status = std::static_pointer_cast<BinBFIndex>(index_)->AddWithoutIds(vectors->GetCount(),
vectors_data.data(), Config());
status = std::static_pointer_cast<BinBFIndex>(index_)->SetBlacklist(concurrent_bitset_ptr);
int64_t index_size = vectors->GetCount() * conf->d * sizeof(uint8_t);
int64_t bitset_size = vectors->GetCount() / 8;
index_->set_size(index_size + bitset_size);
}
if (!status.ok()) {
return status;

View File

@ -274,5 +274,10 @@ MemManagerImpl::GetMaxLSN(const MemList& tables) {
return max_lsn;
}
void
MemManagerImpl::OnInsertBufferSizeChanged(int64_t value) {
options_.insert_buffer_size_ = value * ONE_GB;
}
} // namespace engine
} // namespace milvus

View File

@ -19,8 +19,9 @@
#include <string>
#include <vector>
#include "MemManager.h"
#include "MemTable.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/insert/MemManager.h"
#include "db/insert/MemTable.h"
#include "db/meta/Meta.h"
#include "server/Config.h"
#include "utils/Status.h"
@ -28,33 +29,15 @@
namespace milvus {
namespace engine {
class MemManagerImpl : public MemManager {
class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
public:
using Ptr = std::shared_ptr<MemManagerImpl>;
using MemIdMap = std::map<std::string, MemTablePtr>;
using MemList = std::vector<MemTablePtr>;
MemManagerImpl(const meta::MetaPtr& meta, const DBOptions& options) : meta_(meta), options_(options) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID("MemManagerImpl", identity_);
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t buffer_size;
auto status = config.GetCacheConfigInsertBufferSize(buffer_size);
if (status.ok()) {
options_.insert_buffer_size_ = buffer_size * ONE_GB;
}
return status;
};
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_, lambda);
}
~MemManagerImpl() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_);
SetIdentity("MemManagerImpl");
AddInsertBufferSizeListener();
}
Status
@ -92,6 +75,10 @@ class MemManagerImpl : public MemManager {
size_t
GetCurrentMem() override;
protected:
void
OnInsertBufferSizeChanged(int64_t value) override;
private:
MemTablePtr
GetMemByTable(const std::string& table_id);
@ -108,7 +95,6 @@ class MemManagerImpl : public MemManager {
uint64_t
GetMaxLSN(const MemList& tables);
std::string identity_;
MemIdMap mem_id_map_;
MemList immu_mem_list_;
meta::MetaPtr meta_;

View File

@ -30,6 +30,8 @@ namespace engine {
MemTable::MemTable(const std::string& table_id, const meta::MetaPtr& meta, const DBOptions& options)
: table_id_(table_id), meta_(meta), options_(options) {
SetIdentity("MemTable");
AddCacheInsertDataListener();
}
Status
@ -381,5 +383,10 @@ MemTable::SetLSN(uint64_t lsn) {
lsn_ = lsn;
}
void
MemTable::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus

View File

@ -18,14 +18,15 @@
#include <string>
#include <vector>
#include "MemTableFile.h"
#include "VectorSource.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/insert/MemTableFile.h"
#include "db/insert/VectorSource.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MemTable {
class MemTable : public server::CacheConfigHandler {
public:
using MemTableFileList = std::vector<MemTableFilePtr>;
@ -64,6 +65,10 @@ class MemTable {
void
SetLSN(uint64_t lsn);
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
ApplyDeletes();

View File

@ -40,6 +40,13 @@ MemTableFile::MemTableFile(const std::string& table_id, const meta::MetaPtr& met
utils::GetParentPath(table_file_schema_.location_, directory);
segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(directory);
}
SetIdentity("MemTableFile");
AddCacheInsertDataListener();
}
MemTableFile::~MemTableFile() {
RemoveCacheInsertDataListener();
}
Status
@ -216,5 +223,10 @@ MemTableFile::GetSegmentId() const {
return table_file_schema_.segment_id_;
}
void
MemTableFile::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus

View File

@ -17,18 +17,22 @@
#include <string>
#include <vector>
#include "VectorSource.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/engine/ExecutionEngine.h"
#include "db/insert/VectorSource.h"
#include "db/meta/Meta.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MemTableFile {
class MemTableFile : public server::CacheConfigHandler {
public:
MemTableFile(const std::string& table_id, const meta::MetaPtr& meta, const DBOptions& options);
~MemTableFile();
public:
Status
Add(const VectorSourcePtr& source);
@ -53,6 +57,10 @@ class MemTableFile {
const std::string&
GetSegmentId() const;
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
CreateTableFile();

View File

@ -73,9 +73,6 @@ NSG::Load(const BinarySet& index_binary) {
DatasetPtr
NSG::Search(const DatasetPtr& dataset, const Config& config) {
auto build_cfg = std::dynamic_pointer_cast<NSGCfg>(config);
// if (build_cfg != nullptr) {
// build_cfg->CheckValid(); // throw exception
// }
if (!index_ || !index_->is_trained) {
KNOWHERE_THROW_MSG("index not initialize or trained");
@ -120,7 +117,6 @@ NSG::Train(const DatasetPtr& dataset, const Config& config) {
preprocess_index->Add(dataset, config);
preprocess_index->GenGraph(raw_data, build_cfg->knng, knng, config);
} else {
// TODO(linxj): use ivf instead?
auto gpu_idx = cloner::CopyCpuToGpu(idmap, build_cfg->gpu_id, config);
auto gpu_idmap = std::dynamic_pointer_cast<GPUIDMAP>(gpu_idx);
gpu_idmap->GenGraph(raw_data, build_cfg->knng, knng, config);

View File

@ -19,6 +19,12 @@ namespace scheduler {
BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
SetIdentity("BuildIndexJob");
AddCacheInsertDataListener();
}
BuildIndexJob::~BuildIndexJob() {
RemoveCacheInsertDataListener();
}
bool
@ -58,5 +64,10 @@ BuildIndexJob::Dump() const {
return ret;
}
void
BuildIndexJob::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace scheduler
} // namespace milvus

View File

@ -21,9 +21,10 @@
#include <unordered_map>
#include <vector>
#include "Job.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
#include "scheduler/job/Job.h"
namespace milvus {
namespace scheduler {
@ -33,10 +34,12 @@ using engine::meta::TableFileSchemaPtr;
using Id2ToIndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
class BuildIndexJob : public Job, public server::CacheConfigHandler {
public:
explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
~BuildIndexJob();
public:
bool
AddToIndexFiles(const TableFileSchemaPtr& to_index_file);
@ -71,6 +74,10 @@ class BuildIndexJob : public Job {
return options_;
}
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Id2ToIndexMap to_index_files_;
engine::meta::MetaPtr meta_ptr_;

View File

@ -22,13 +22,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuBuildResHandler.h"
#include "config/handler/GpuBuildConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class BuildIndexPass : public Pass, public server::GpuBuildResHandler {
class BuildIndexPass : public Pass, public server::GpuBuildConfigHandler {
public:
BuildIndexPass() = default;

View File

@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissFlatPass : public Pass, public server::GpuSearchResHandler {
class FaissFlatPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissFlatPass() = default;

View File

@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFFlatPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFFlatPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFFlatPass() = default;

View File

@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFPQPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFPQPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFPQPass() = default;

View File

@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8HPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFSQ8HPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFSQ8HPass() = default;

View File

@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8Pass : public Pass, public server::GpuSearchResHandler {
class FaissIVFSQ8Pass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFSQ8Pass() = default;

View File

@ -9,8 +9,12 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <cache/CpuCacheMgr.h>
#include <cache/GpuCacheMgr.h>
#include <fiu-local.h>
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm>
#include <chrono>
#include <iostream>
@ -29,10 +33,6 @@
#include "utils/StringHelpFunctions.h"
#include "utils/ValidationUtil.h"
#include <cache/CpuCacheMgr.h>
#include <cache/GpuCacheMgr.h>
#include <fiu-local.h>
namespace milvus {
namespace server {
@ -262,6 +262,7 @@ Config::ResetDefaultConfig() {
CONFIG_CHECK(SetDBConfigPreloadTable(CONFIG_DB_PRELOAD_TABLE_DEFAULT));
CONFIG_CHECK(SetDBConfigArchiveDiskThreshold(CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT));
CONFIG_CHECK(SetDBConfigArchiveDaysThreshold(CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT));
CONFIG_CHECK(SetDBConfigAutoFlushInterval(CONFIG_DB_AUTO_FLUSH_INTERVAL_DEFAULT));
/* storage config */
CONFIG_CHECK(SetStorageConfigPrimaryPath(CONFIG_STORAGE_PRIMARY_PATH_DEFAULT));
@ -352,6 +353,8 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key
status = SetDBConfigBackendUrl(value);
} else if (child_key == CONFIG_DB_PRELOAD_TABLE) {
status = SetDBConfigPreloadTable(value);
} else if (child_key == CONFIG_DB_AUTO_FLUSH_INTERVAL) {
status = SetDBConfigAutoFlushInterval(value);
} else {
status = Status(SERVER_UNEXPECTED_ERROR, invalid_node_str);
}
@ -428,7 +431,11 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key
}
#endif
} else if (parent_key == CONFIG_TRACING) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set tracing_config currently");
if (child_key == CONFIG_TRACING_JSON_CONFIG_PATH) {
status = SetTracingConfigJsonConfigPath(value);
} else {
status = Status(SERVER_UNEXPECTED_ERROR, invalid_node_str);
}
} else if (parent_key == CONFIG_WAL) {
if (child_key == CONFIG_WAL_ENABLE) {
status = SetWalConfigEnable(value);
@ -504,7 +511,7 @@ Config::GenUniqueIdentityID(const std::string& identity, std::string& uid) {
// get current timestamp
auto time_now = std::chrono::system_clock::now();
auto duration_in_ms = std::chrono::duration_cast<std::chrono::microseconds>(time_now.time_since_epoch());
auto duration_in_ms = std::chrono::duration_cast<std::chrono::nanoseconds>(time_now.time_since_epoch());
elements.push_back(std::to_string(duration_in_ms.count()));
StringHelpFunctions::MergeStringWithDelimeter(elements, "-", uid);
@ -763,12 +770,17 @@ Config::CheckDBConfigBackendUrl(const std::string& value) {
Status
Config::CheckDBConfigPreloadTable(const std::string& value) {
fiu_return_on("check_config_preload_table_fail", Status(SERVER_INVALID_ARGUMENT, ""));
if (value.empty() || value == "*") {
return Status::OK();
}
std::vector<std::string> tables;
StringHelpFunctions::SplitStringByDelimeter(value, ",", tables);
std::unordered_set<std::string> table_set;
for (auto& table : tables) {
if (!ValidationUtil::ValidateTableName(table).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid table name: " + table);
@ -778,6 +790,14 @@ Config::CheckDBConfigPreloadTable(const std::string& value) {
if (!(status.ok() && exist)) {
return Status(SERVER_TABLE_NOT_EXIST, "Table " + table + " not exist");
}
table_set.insert(table);
}
if (table_set.size() != tables.size()) {
std::string msg =
"Invalid preload tables. "
"Possible reason: db_config.preload_table contains duplicate table.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
return Status::OK();
@ -811,7 +831,10 @@ Config::CheckDBConfigArchiveDaysThreshold(const std::string& value) {
Status
Config::CheckDBConfigAutoFlushInterval(const std::string& value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
auto exist_error = !ValidationUtil::ValidateStringIsNumber(value).ok();
fiu_do_on("check_config_auto_flush_interval_fail", exist_error = true);
if (exist_error) {
std::string msg = "Invalid db configuration auto_flush_interval: " + value +
". Possible reason: db.auto_flush_interval is not a natural number.";
return Status(SERVER_INVALID_ARGUMENT, msg);
@ -986,8 +1009,9 @@ Config::CheckCacheConfigCpuCacheCapacity(const std::string& value) {
std::cerr << "WARNING: cpu cache capacity value is too big" << std::endl;
}
int64_t buffer_value;
CONFIG_CHECK(GetCacheConfigInsertBufferSize(buffer_value));
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_INSERT_BUFFER_SIZE, CONFIG_CACHE_INSERT_BUFFER_SIZE_DEFAULT);
int64_t buffer_value = std::stoll(str);
int64_t insert_buffer_size = buffer_value * GB;
fiu_do_on("Config.CheckCacheConfigCpuCacheCapacity.large_insert_buffer", insert_buffer_size = total_mem + 1);
@ -1035,9 +1059,13 @@ Config::CheckCacheConfigInsertBufferSize(const std::string& value) {
return Status(SERVER_INVALID_ARGUMENT, msg);
}
std::string str =
GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT);
int64_t cache_size = std::stoll(str);
uint64_t total_mem = 0, free_mem = 0;
CommonUtil::GetSystemMemInfo(total_mem, free_mem);
if (buffer_size >= total_mem) {
if (buffer_size + cache_size >= total_mem) {
std::string msg = "Invalid insert buffer size: " + value +
". Possible reason: cache_config.insert_buffer_size exceeds system memory.";
return Status(SERVER_INVALID_ARGUMENT, msg);
@ -1258,6 +1286,13 @@ Config::CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>
}
#endif
/* tracing config */
Status
Config::CheckTracingConfigJsonConfigPath(const std::string& value) {
std::string msg = "Invalid wal config: " + value +
". Possible reason: tracing_config.json_config_path is not supported to configure.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
/* wal config */
Status
@ -1835,6 +1870,12 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) {
return SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value);
}
Status
Config::SetDBConfigAutoFlushInterval(const std::string& value) {
CONFIG_CHECK(CheckDBConfigAutoFlushInterval(value));
return SetConfigValueInMem(CONFIG_DB, CONFIG_DB_AUTO_FLUSH_INTERVAL, value);
}
/* storage config */
Status
Config::SetStorageConfigPrimaryPath(const std::string& value) {
@ -1952,6 +1993,13 @@ Config::SetEngineConfigUseAVX512(const std::string& value) {
return SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_AVX512, value);
}
/* tracing config */
Status
Config::SetTracingConfigJsonConfigPath(const std::string& value) {
CONFIG_CHECK(CheckTracingConfigJsonConfigPath(value));
return SetConfigValueInMem(CONFIG_TRACING, CONFIG_TRACING_JSON_CONFIG_PATH, value);
}
/* wal config */
Status
Config::SetWalConfigEnable(const std::string& value) {

View File

@ -283,6 +283,10 @@ class Config {
CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>& value);
#endif
/* tracing config */
Status
CheckTracingConfigJsonConfigPath(const std::string& value);
/* wal config */
Status
CheckWalConfigEnable(const std::string& value);
@ -429,6 +433,8 @@ class Config {
SetDBConfigArchiveDiskThreshold(const std::string& value);
Status
SetDBConfigArchiveDaysThreshold(const std::string& value);
Status
SetDBConfigAutoFlushInterval(const std::string& value);
/* storage config */
Status
@ -474,6 +480,10 @@ class Config {
Status
SetEngineConfigUseAVX512(const std::string& value);
/* tracing config */
Status
SetTracingConfigJsonConfigPath(const std::string& value);
/* wal config */
Status
SetWalConfigEnable(const std::string& value);

View File

@ -184,6 +184,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
ASSERT_TRUE(config.GetDBConfigArchiveDaysThreshold(int64_val).ok());
ASSERT_TRUE(int64_val == db_archive_days_threshold);
int64_t db_auto_flush_interval = 1;
ASSERT_TRUE(config.SetDBConfigAutoFlushInterval(std::to_string(db_auto_flush_interval)).ok());
ASSERT_TRUE(config.GetDBConfigAutoFlushInterval(int64_val).ok());
ASSERT_TRUE(int64_val == db_auto_flush_interval);
/* storage config */
std::string storage_primary_path = "/home/zilliz";
ASSERT_TRUE(config.SetStorageConfigPrimaryPath(storage_primary_path).ok());
@ -573,6 +578,8 @@ TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
ASSERT_FALSE(config.SetDBConfigArchiveDaysThreshold("0x10").ok());
ASSERT_FALSE(config.SetDBConfigAutoFlushInterval("0.1").ok());
/* storage config */
ASSERT_FALSE(config.SetStorageConfigPrimaryPath("").ok());
@ -851,10 +858,10 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_FAIL_TEST) {
ASSERT_FALSE(s.ok());
fiu_disable("ValidationUtil.GetGpuMemory.return_error");
fiu_enable("check_config_insert_buffer_size_fail", 1, NULL, 0);
s = config.GetCacheConfigCpuCacheCapacity(value);
ASSERT_FALSE(s.ok());
fiu_disable("check_config_insert_buffer_size_fail");
// fiu_enable("check_config_insert_buffer_size_fail", 1, NULL, 0);
// s = config.GetCacheConfigCpuCacheCapacity(value);
// ASSERT_FALSE(s.ok());
// fiu_disable("check_config_insert_buffer_size_fail");
fiu_enable("Config.CheckCacheConfigCpuCacheCapacity.large_insert_buffer", 1, NULL, 0);
s = config.GetCacheConfigCpuCacheCapacity(value);
@ -943,6 +950,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_RESET_DEFAULT_CONFIG_FAIL_TEST) {
ASSERT_FALSE(s.ok());
fiu_disable("check_config_backend_url_fail");
fiu_enable("check_config_preload_table_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
fiu_disable("check_config_preload_table_fail");
fiu_enable("check_config_archive_disk_threshold_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
@ -953,6 +965,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_RESET_DEFAULT_CONFIG_FAIL_TEST) {
ASSERT_FALSE(s.ok());
fiu_disable("check_config_archive_days_threshold_fail");
fiu_enable("check_config_auto_flush_interval_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
fiu_disable("check_config_auto_flush_interval_fail");
fiu_enable("check_config_insert_buffer_size_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());

View File

@ -1627,7 +1627,7 @@ TEST_F(WebControllerTest, CONFIG) {
OString table_name = "milvus_test_webcontroller_test_preload_table";
GenTable(table_name, 16, 10, "L2");
OString table_name_s = "milvus_test_webcontroller_test_preload_table";
OString table_name_s = "milvus_test_webcontroller_test_preload_table_s";
GenTable(table_name_s, 16, 10, "L2");
OString body_str = "{\"db_config\": {\"preload_table\": \"" + table_name + "\"}}";