Former-commit-id: 2525272b679569dffdd50287773fc28370d4d8c9
pull/191/head
zhiru 2019-06-27 11:15:47 +08:00
commit aad7c82400
9 changed files with 106 additions and 76 deletions

View File

@ -11,6 +11,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-89 - Fix compile failed, libgpufaiss.a link missing
- MS-90 - Fix arch match incorrect on ARM
- MS-99 - Fix compilation bug
- MS-110 - Avoid huge file size
## Improvement
- MS-82 - Update server startup welcome message
@ -19,7 +20,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-22 - Enhancement for MemVector size control
- MS-92 - Unify behavior of debug and release build
- MS-98 - Install all unit test to installation directory
- MS-115 - Change is_startup of metric_config switch from true to on
## New Feature
- MS-57 - Implement index load/search pipeline

View File

@ -1,29 +1,29 @@
server_config:
address: 0.0.0.0
port: 19530
transfer_protocol: binary #optional: binary, compact, json
server_mode: thread_pool #optional: simple, thread_pool
gpu_index: 0 #which gpu to be used
mode: single #optional: single, cluster
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster
db_config:
db_path: /tmp/milvus
db_path: /tmp/milvus # milvus data storage path
#URI format: dialect://username:password@host:port/database
#All parts except dialect are optional, but you MUST include the delimiters
db_backend_url: mysql://root:1234@:/test
index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB
#Currently supports mysql or sqlite
db_backend_url: dialect://username:password@host:port/database # meta database uri
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
metric_config:
is_startup: true # true is on, false is off
collector: prometheus # prometheus, now we only have prometheus
prometheus_config:
collect_type: pull # pull means prometheus pull the message from server, push means server push metric to push gateway
port: 8080
push_gateway_ip_address: 127.0.0.1
push_gateway_port: 9091
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
license_config:
license_path: "/tmp/system.license"
cache_config:
cpu_cache_capacity: 16 # memory pool to hold index data, unit: GB
license_config: # license configure
license_path: "/tmp/system.license" # license file path
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory

View File

@ -479,7 +479,7 @@ void DBImpl::StartCompactionTask() {
}
//serialize memory data
std::vector<std::string> temp_table_ids;
std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids);
for(auto& id : temp_table_ids) {
compact_table_ids_.insert(id);
@ -550,7 +550,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
index->Cache();
//current disable this line to avoid memory
//index->Cache();
return status;
}
@ -670,7 +671,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id_;
index->Cache();
//current disable this line to avoid memory
//index->Cache();
} catch (std::exception& ex) {
return Status::Error("Build index encounter exception", ex.what());
@ -709,7 +711,7 @@ Status DBImpl::Size(uint64_t& result) {
DBImpl::~DBImpl() {
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::vector<std::string> ids;
std::set<std::string> ids;
mem_mgr_->Serialize(ids);
}

View File

@ -20,36 +20,54 @@ namespace engine {
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::TableFileSchema& schema, const Options& options)
: pMeta_(meta_ptr),
: meta_(meta_ptr),
options_(options),
schema_(schema),
pIdGenerator_(new SimpleIDGenerator()),
pEE_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
id_generator_(new SimpleIDGenerator()),
active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
}
void MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
if(active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
auto start_time = METRICS_NOW_TIME;
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
id_generator_->GetNextIDNumbers(n_, vector_ids_);
Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time);
return status;
}
size_t MemVectors::Total() const {
return pEE_->Count();
size_t MemVectors::RowCount() const {
if(active_engine_ == nullptr) {
return 0;
}
return active_engine_->Count();
}
size_t MemVectors::ApproximateSize() const {
return pEE_->Size();
size_t MemVectors::Size() const {
if(active_engine_ == nullptr) {
return 0;
}
return active_engine_->Size();
}
Status MemVectors::Serialize(std::string& table_id) {
if(active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
table_id = schema_.table_id_;
auto size = ApproximateSize();
auto size = Size();
auto start_time = METRICS_NOW_TIME;
pEE_->Serialize();
active_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
schema_.size_ = size;
@ -59,20 +77,20 @@ Status MemVectors::Serialize(std::string& table_id) {
schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = pMeta_->UpdateTableFile(schema_);
auto status = meta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << (double)(pEE_->Size()) / (double)meta::M << " M";
<< " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M";
pEE_->Cache();
active_engine_->Cache();
return status;
}
MemVectors::~MemVectors() {
if (pIdGenerator_ != nullptr) {
delete pIdGenerator_;
pIdGenerator_ = nullptr;
if (id_generator_ != nullptr) {
delete id_generator_;
id_generator_ = nullptr;
}
}
@ -81,20 +99,20 @@ MemVectors::~MemVectors() {
*/
MemManager::MemVectorsPtr MemManager::GetMemByTable(
const std::string& table_id) {
auto memIt = memMap_.find(table_id);
if (memIt != memMap_.end()) {
auto memIt = mem_id_map_.find(table_id);
if (memIt != mem_id_map_.end()) {
return memIt->second;
}
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
auto status = pMeta_->CreateTableFile(table_file);
auto status = meta_->CreateTableFile(table_file);
if (!status.ok()) {
return nullptr;
}
memMap_[table_id] = MemVectorsPtr(new MemVectors(pMeta_, table_file, options_));
return memMap_[table_id];
mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
return mem_id_map_[table_id];
}
Status MemManager::InsertVectors(const std::string& table_id_,
@ -114,37 +132,44 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
if (mem == nullptr) {
return Status::NotFound("Group " + table_id + " not found!");
}
mem->Add(n, vectors, vector_ids);
return Status::OK();
//makesure each file size less than index_trigger_size
if(mem->Size() > options_.index_trigger_size) {
std::unique_lock<std::mutex> lock(serialization_mtx_);
immu_mem_list_.push_back(mem);
mem_id_map_.erase(table_id);
return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
} else {
return mem->Add(n, vectors, vector_ids);
}
}
Status MemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) {
immMems_.push_back(kv.second);
for (auto& kv: mem_id_map_) {
immu_mem_list_.push_back(kv.second);
}
memMap_.clear();
mem_id_map_.clear();
return Status::OK();
}
Status MemManager::Serialize(std::vector<std::string>& table_ids) {
Status MemManager::Serialize(std::set<std::string>& table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string table_id;
table_ids.clear();
for (auto& mem : immMems_) {
for (auto& mem : immu_mem_list_) {
mem->Serialize(table_id);
table_ids.push_back(table_id);
table_ids.insert(table_id);
}
immMems_.clear();
immu_mem_list_.clear();
return Status::OK();
}
Status MemManager::EraseMemVector(const std::string& table_id) {
std::unique_lock<std::mutex> lock(mutex_);
memMap_.erase(table_id);
mem_id_map_.erase(table_id);
return Status::OK();
}

View File

@ -15,6 +15,7 @@
#include <ctime>
#include <memory>
#include <mutex>
#include <set>
namespace zilliz {
namespace milvus {
@ -32,11 +33,11 @@ public:
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::TableFileSchema&, const Options&);
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
size_t Total() const;
size_t RowCount() const;
size_t ApproximateSize() const;
size_t Size() const;
Status Serialize(std::string& table_id);
@ -49,11 +50,11 @@ private:
MemVectors(const MemVectors&) = delete;
MemVectors& operator=(const MemVectors&) = delete;
MetaPtr pMeta_;
MetaPtr meta_;
Options options_;
meta::TableFileSchema schema_;
IDGenerator* pIdGenerator_;
ExecutionEnginePtr pEE_;
IDGenerator* id_generator_;
ExecutionEnginePtr active_engine_;
}; // MemVectors
@ -66,14 +67,14 @@ public:
using Ptr = std::shared_ptr<MemManager>;
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {}
: meta_(meta), options_(options) {}
MemVectorsPtr GetMemByTable(const std::string& table_id);
Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status Serialize(std::vector<std::string>& table_ids);
Status Serialize(std::set<std::string>& table_ids);
Status EraseMemVector(const std::string& table_id);
@ -82,11 +83,11 @@ private:
size_t n, const float* vectors, IDNumbers& vector_ids);
Status ToImmutable();
using MemMap = std::map<std::string, MemVectorsPtr>;
using ImmMemPool = std::vector<MemVectorsPtr>;
MemMap memMap_;
ImmMemPool immMems_;
MetaPtr pMeta_;
using MemIdMap = std::map<std::string, MemVectorsPtr>;
using MemList = std::vector<MemVectorsPtr>;
MemIdMap mem_id_map_;
MemList immu_mem_list_;
MetaPtr meta_;
Options options_;
std::mutex mutex_;
std::mutex serialization_mtx_;

View File

@ -25,7 +25,7 @@ public:
conns_in_use_ = 0;
maxIdleTime_ = 300; //300ms
maxIdleTime_ = 10; //10 seconds
}
// The destructor. We _must_ call ConnectionPool::clear() here,
@ -56,7 +56,7 @@ public:
// ENGINE_LOG_DEBUG << "conns_in_use_ in release: " << conns_in_use_ << std::endl;
--conns_in_use_;
if (conns_in_use_ < 0) {
ENGINE_LOG_ERROR << "conns_in_use_ in release less than zero: " << conns_in_use_ << std::endl;
ENGINE_LOG_ERROR << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl;
}
}

View File

@ -320,7 +320,7 @@ namespace meta {
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
Query createTableQuery = connectionPtr->query();
ENGINE_LOG_DEBUG << "Create Table in";
// ENGINE_LOG_DEBUG << "Create Table in";
if (table_schema.table_id_.empty()) {
NextTableId(table_schema.table_id_);
} else {

View File

@ -17,7 +17,8 @@ ServerError
PrometheusMetrics::Init() {
try {
ConfigNode &configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true : false;
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "on";
if(!startup_) return SERVER_SUCCESS;
// Following should be read from config file.
const std::string bind_address = configNode.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT);
const std::string uri = std::string("/metrics");

View File

@ -33,7 +33,7 @@ TEST_F(DBTest, Metric_Tes) {
server::Metrics::GetInstance().Init();
// server::PrometheusMetrics::GetInstance().exposer_ptr()->RegisterCollectable(server::PrometheusMetrics::GetInstance().registry_ptr());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->SetCapacity(2UL*1024*1024*1024);
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->SetCapacity(1UL*1024*1024*1024);
std::cout<<zilliz::milvus::cache::CpuCacheMgr::GetInstance()->CacheCapacity()<<std::endl;
static const std::string group_name = "test_group";
@ -102,7 +102,7 @@ TEST_F(DBTest, Metric_Tes) {
}
});
int loop = 10;
int loop = 10000;
for (auto i=0; i<loop; ++i) {
if (i==40) {