mirror of https://github.com/milvus-io/milvus.git
Support set segcore chunk_size via config file (#7635)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/7755/head
parent
722aa8ac4d
commit
27dcf698d3
|
@ -0,0 +1,14 @@
|
|||
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
segcore:
|
||||
chunk_size: 32768
|
||||
|
|
@ -73,7 +73,7 @@ Search_SmallIndex(benchmark::State& state) {
|
|||
segconf.set_size_per_chunk(chunk_size);
|
||||
auto segment = CreateGrowingSegment(schema, segconf);
|
||||
if (!is_small_index) {
|
||||
segment->debug_disable_small_index();
|
||||
segment->disable_small_index();
|
||||
}
|
||||
segment->PreInsert(N);
|
||||
ColumnBasedRawData raw_data;
|
||||
|
|
|
@ -23,10 +23,6 @@ datatype_sizeof(DataType data_type, int dim = 1) {
|
|||
switch (data_type) {
|
||||
case DataType::BOOL:
|
||||
return sizeof(bool);
|
||||
case DataType::DOUBLE:
|
||||
return sizeof(double);
|
||||
case DataType::FLOAT:
|
||||
return sizeof(float);
|
||||
case DataType::INT8:
|
||||
return sizeof(int8_t);
|
||||
case DataType::INT16:
|
||||
|
@ -35,6 +31,10 @@ datatype_sizeof(DataType data_type, int dim = 1) {
|
|||
return sizeof(int32_t);
|
||||
case DataType::INT64:
|
||||
return sizeof(int64_t);
|
||||
case DataType::FLOAT:
|
||||
return sizeof(float);
|
||||
case DataType::DOUBLE:
|
||||
return sizeof(double);
|
||||
case DataType::VECTOR_FLOAT:
|
||||
return sizeof(float) * dim;
|
||||
case DataType::VECTOR_BINARY: {
|
||||
|
@ -43,7 +43,6 @@ datatype_sizeof(DataType data_type, int dim = 1) {
|
|||
}
|
||||
default: {
|
||||
throw std::invalid_argument("unsupported data type");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -54,10 +53,6 @@ datatype_name(DataType data_type) {
|
|||
switch (data_type) {
|
||||
case DataType::BOOL:
|
||||
return "bool";
|
||||
case DataType::DOUBLE:
|
||||
return "double";
|
||||
case DataType::FLOAT:
|
||||
return "float";
|
||||
case DataType::INT8:
|
||||
return "int8_t";
|
||||
case DataType::INT16:
|
||||
|
@ -66,6 +61,10 @@ datatype_name(DataType data_type) {
|
|||
return "int32_t";
|
||||
case DataType::INT64:
|
||||
return "int64_t";
|
||||
case DataType::FLOAT:
|
||||
return "float";
|
||||
case DataType::DOUBLE:
|
||||
return "double";
|
||||
case DataType::VECTOR_FLOAT:
|
||||
return "vector_float";
|
||||
case DataType::VECTOR_BINARY: {
|
||||
|
@ -84,7 +83,7 @@ datatype_is_vector(DataType datatype) {
|
|||
}
|
||||
|
||||
inline bool
|
||||
datatype_is_interger(DataType datatype) {
|
||||
datatype_is_integer(DataType datatype) {
|
||||
switch (datatype) {
|
||||
case DataType::INT8:
|
||||
case DataType::INT16:
|
||||
|
@ -99,8 +98,8 @@ datatype_is_interger(DataType datatype) {
|
|||
inline bool
|
||||
datatype_is_floating(DataType datatype) {
|
||||
switch (datatype) {
|
||||
case DataType::DOUBLE:
|
||||
case DataType::FLOAT:
|
||||
case DataType::DOUBLE:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
|
@ -166,7 +165,7 @@ class FieldMeta {
|
|||
if (is_vector()) {
|
||||
return datatype_sizeof(type_, get_dim());
|
||||
} else {
|
||||
return datatype_sizeof(type_, 1);
|
||||
return datatype_sizeof(type_);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,11 +23,6 @@ class SpanBase {
|
|||
: data_(data), row_count_(row_count), element_sizeof_(element_sizeof) {
|
||||
}
|
||||
|
||||
int64_t
|
||||
get_element_sizeof() const {
|
||||
return element_sizeof_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
row_count() const {
|
||||
return row_count_;
|
||||
|
|
|
@ -30,7 +30,7 @@ class BinaryVector : public VectorTrait {
|
|||
|
||||
template <typename VectorType>
|
||||
inline constexpr int64_t
|
||||
get_element_sizeof(int64_t dim) {
|
||||
element_sizeof(int64_t dim) {
|
||||
static_assert(std::is_base_of_v<VectorType, VectorTrait>);
|
||||
if constexpr (std::is_same_v<VectorType, FloatVector>) {
|
||||
return dim * sizeof(float);
|
||||
|
|
|
@ -42,16 +42,16 @@ apply_parser(const YAML::Node& node, Func func) {
|
|||
return results;
|
||||
}
|
||||
|
||||
SegcoreConfig
|
||||
void
|
||||
SegcoreConfig::parse_from(const std::string& config_path) {
|
||||
try {
|
||||
SegcoreConfig result;
|
||||
YAML::Node top_config = YAML::LoadFile(config_path);
|
||||
Assert(top_config.IsMap());
|
||||
auto seg_config = subnode(top_config, "segcore");
|
||||
auto chunk_size = subnode(seg_config, "chunk_size").as<int64_t>();
|
||||
result.size_per_chunk_ = chunk_size;
|
||||
this->size_per_chunk_ = chunk_size;
|
||||
|
||||
#if 0
|
||||
auto index_list = subnode(seg_config, "small_index");
|
||||
|
||||
Assert(index_list.IsSequence());
|
||||
|
@ -92,16 +92,16 @@ SegcoreConfig::parse_from(const std::string& config_path) {
|
|||
}
|
||||
|
||||
for (auto metric_type : metric_types) {
|
||||
Assert(result.table_.count(metric_type));
|
||||
Assert(result.table_.count(metric_type) == 0);
|
||||
result.table_[metric_type] = conf;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
#endif
|
||||
} catch (const SegcoreError& e) {
|
||||
// re-throw
|
||||
throw e;
|
||||
} catch (const std::exception& e) {
|
||||
PanicInfo(std::string("Invalid Yaml:\n") + e.what());
|
||||
std::string str = std::string("Invalid Yaml: ") + config_path + ", err: " + e.what();
|
||||
PanicInfo(str);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,10 +10,12 @@
|
|||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#include "common/Types.h"
|
||||
#include "utils/Json.h"
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include "common/Types.h"
|
||||
#include "exceptions/EasyAssert.h"
|
||||
#include "utils/Json.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
|
@ -24,23 +26,28 @@ struct SmallIndexConf {
|
|||
};
|
||||
|
||||
class SegcoreConfig {
|
||||
public:
|
||||
static SegcoreConfig
|
||||
parse_from(const std::string& string_path);
|
||||
static SegcoreConfig
|
||||
default_config() {
|
||||
// TODO: remove this when go side is ready
|
||||
SegcoreConfig config;
|
||||
config.set_size_per_chunk(32 * 1024);
|
||||
private:
|
||||
SegcoreConfig() {
|
||||
// hard code configurations for small index
|
||||
SmallIndexConf sub_conf;
|
||||
sub_conf.build_params["nlist"] = 100;
|
||||
sub_conf.search_params["nprobe"] = 4;
|
||||
sub_conf.index_type = "IVF";
|
||||
config.table_[MetricType::METRIC_L2] = sub_conf;
|
||||
config.table_[MetricType::METRIC_INNER_PRODUCT] = sub_conf;
|
||||
table_[MetricType::METRIC_L2] = sub_conf;
|
||||
table_[MetricType::METRIC_INNER_PRODUCT] = sub_conf;
|
||||
}
|
||||
|
||||
public:
|
||||
static SegcoreConfig&
|
||||
default_config() {
|
||||
// TODO: remove this when go side is ready
|
||||
static SegcoreConfig config;
|
||||
return config;
|
||||
}
|
||||
|
||||
void
|
||||
parse_from(const std::string& string_path);
|
||||
|
||||
const SmallIndexConf&
|
||||
at(MetricType metric_type) const {
|
||||
Assert(table_.count(metric_type));
|
||||
|
@ -62,11 +69,8 @@ class SegcoreConfig {
|
|||
table_[metric_type] = small_index_conf;
|
||||
}
|
||||
|
||||
protected:
|
||||
SegcoreConfig() = default;
|
||||
|
||||
private:
|
||||
int64_t size_per_chunk_ = -1;
|
||||
int64_t size_per_chunk_ = 32768;
|
||||
std::map<MetricType, SmallIndexConf> table_;
|
||||
};
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ TestABI();
|
|||
class SegmentGrowing : public SegmentInternalInterface {
|
||||
public:
|
||||
virtual void
|
||||
debug_disable_small_index() = 0;
|
||||
disable_small_index() = 0;
|
||||
|
||||
virtual int64_t
|
||||
PreInsert(int64_t size) = 0;
|
||||
|
|
|
@ -202,9 +202,9 @@ SegmentGrowingImpl::do_insert(int64_t reserved_begin,
|
|||
}
|
||||
|
||||
record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
|
||||
if (!debug_disable_small_index_) {
|
||||
indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / segcore_config_.get_size_per_chunk(),
|
||||
record_);
|
||||
if (enable_small_index_) {
|
||||
int64_t chunk_size = segcore_config_.get_size_per_chunk();
|
||||
indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / chunk_size, record_);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -115,8 +115,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||
public:
|
||||
// only for debug
|
||||
void
|
||||
debug_disable_small_index() override {
|
||||
debug_disable_small_index_ = true;
|
||||
disable_small_index() override {
|
||||
enable_small_index_ = false;
|
||||
}
|
||||
|
||||
ssize_t
|
||||
|
@ -217,7 +217,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
|
||||
|
||||
private:
|
||||
bool debug_disable_small_index_ = false;
|
||||
bool enable_small_index_ = true;
|
||||
};
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
|
|
@ -9,15 +9,17 @@
|
|||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include "index/thirdparty/faiss/FaissHook.h"
|
||||
#include "segcore/segcore_init_c.h"
|
||||
#include "knowhere/archive/KnowhereConfig.h"
|
||||
#include <iostream>
|
||||
|
||||
#include "exceptions/EasyAssert.h"
|
||||
#include "knowhere/archive/KnowhereConfig.h"
|
||||
#include "segcore/segcore_init_c.h"
|
||||
#include "segcore/SegcoreConfig.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
static void
|
||||
SegcoreInitImpl() {
|
||||
SegcoreInitImpl(const char* config_dir) {
|
||||
namespace eg = milvus::engine;
|
||||
eg::KnowhereConfig::SetSimdType(eg::KnowhereConfig::SimdType::AUTO);
|
||||
eg::KnowhereConfig::SetBlasThreshold(16384);
|
||||
|
@ -26,10 +28,23 @@ SegcoreInitImpl() {
|
|||
eg::KnowhereConfig::SetStatisticsLevel(0);
|
||||
el::Configurations el_conf;
|
||||
el_conf.setGlobally(el::ConfigurationType::Enabled, std::to_string(false));
|
||||
|
||||
// initializing segcore config
|
||||
try {
|
||||
SegcoreConfig& config = SegcoreConfig::default_config();
|
||||
if (config_dir != NULL) {
|
||||
std::string config_file = std::string(config_dir) + "advanced/segcore.yaml";
|
||||
config.parse_from(config_file);
|
||||
std::cout << "Parse config file: " << config_file << ", chunk_size: " << config.get_size_per_chunk()
|
||||
<< std::endl;
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
PanicInfo("parse config fail: " + std::string(e.what()));
|
||||
}
|
||||
}
|
||||
} // namespace milvus::segcore
|
||||
|
||||
extern "C" void
|
||||
SegcoreInit() {
|
||||
milvus::segcore::SegcoreInitImpl();
|
||||
SegcoreInit(const char* config_dir) {
|
||||
milvus::segcore::SegcoreInitImpl(config_dir);
|
||||
}
|
||||
|
|
|
@ -8,12 +8,15 @@
|
|||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
void
|
||||
SegcoreInit();
|
||||
SegcoreInit(const char* config_dir);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -19,5 +19,5 @@
|
|||
TEST(Init, Naive) {
|
||||
using namespace milvus;
|
||||
using namespace milvus::segcore;
|
||||
SegcoreInit();
|
||||
SegcoreInit(NULL);
|
||||
}
|
|
@ -71,7 +71,7 @@ TEST_P(PlanProtoTest, Range) {
|
|||
string value_tag = "bool_val";
|
||||
if (datatype_is_floating((DataType)data_type)) {
|
||||
value_tag = "float_val";
|
||||
} else if (datatype_is_interger((DataType)data_type)) {
|
||||
} else if (datatype_is_integer((DataType)data_type)) {
|
||||
value_tag = "int64_val";
|
||||
}
|
||||
|
||||
|
@ -152,7 +152,7 @@ TEST_P(PlanProtoTest, TermExpr) {
|
|||
string value_tag = "bool_val";
|
||||
if (datatype_is_floating((DataType)data_type)) {
|
||||
value_tag = "float_val";
|
||||
} else if (datatype_is_interger((DataType)data_type)) {
|
||||
} else if (datatype_is_integer((DataType)data_type)) {
|
||||
value_tag = "int64_val";
|
||||
}
|
||||
|
||||
|
@ -239,7 +239,7 @@ TEST(PlanProtoTest, NotExpr) {
|
|||
string value_tag = "bool_val";
|
||||
if (datatype_is_floating((DataType)data_type)) {
|
||||
value_tag = "float_val";
|
||||
} else if (datatype_is_interger((DataType)data_type)) {
|
||||
} else if (datatype_is_integer((DataType)data_type)) {
|
||||
value_tag = "int64_val";
|
||||
}
|
||||
|
||||
|
@ -330,7 +330,7 @@ TEST(PlanProtoTest, AndOrExpr) {
|
|||
string value_tag = "bool_val";
|
||||
if (datatype_is_floating((DataType)data_type)) {
|
||||
value_tag = "float_val";
|
||||
} else if (datatype_is_interger((DataType)data_type)) {
|
||||
} else if (datatype_is_integer((DataType)data_type)) {
|
||||
value_tag = "int64_val";
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"errors"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -123,7 +124,9 @@ func (node *QueryNode) Init() error {
|
|||
node.etcdKV)
|
||||
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
|
||||
|
||||
C.SegcoreInit()
|
||||
cConfigDir := C.CString(Params.BaseTable.GetConfigDir())
|
||||
C.SegcoreInit(cConfigDir)
|
||||
C.free(unsafe.Pointer(cConfigDir))
|
||||
|
||||
if node.rootCoord == nil {
|
||||
log.Error("null root coordinator detected")
|
||||
|
|
|
@ -186,7 +186,7 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Debug("create segment", zap.Int64("segmentID", segmentID))
|
||||
log.Debug("create segment", zap.Int64("segmentID", segmentID), zap.Int32("segmentType", int32(segType)))
|
||||
|
||||
var segment = &Segment{
|
||||
segmentPtr: segmentPtr,
|
||||
|
|
|
@ -12,16 +12,17 @@
|
|||
package paramtable
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"go.uber.org/zap"
|
||||
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -39,29 +40,42 @@ type Base interface {
|
|||
}
|
||||
|
||||
type BaseTable struct {
|
||||
params *memkv.MemoryKV
|
||||
params *memkv.MemoryKV
|
||||
configDir string
|
||||
}
|
||||
|
||||
func (gp *BaseTable) Init() {
|
||||
gp.params = memkv.NewMemoryKV()
|
||||
|
||||
err := gp.LoadYaml("milvus.yaml")
|
||||
if err != nil {
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
configDir := path.Dir(fpath) + "/../../../configs/"
|
||||
if _, err := os.Stat(configDir); err != nil {
|
||||
log.Warn("cannot access config directory", zap.String("configDir", configDir), zap.Error(err))
|
||||
if runPath, err1 := os.Getwd(); err1 != nil {
|
||||
panic(err1.Error())
|
||||
} else {
|
||||
configDir = runPath + "/configs/"
|
||||
}
|
||||
}
|
||||
gp.configDir = configDir
|
||||
log.Debug("config directory", zap.String("configDir", gp.configDir))
|
||||
|
||||
if err := gp.LoadYaml("milvus.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = gp.LoadYaml("advanced/common.yaml")
|
||||
if err != nil {
|
||||
if err := gp.LoadYaml("advanced/common.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = gp.LoadYaml("advanced/channel.yaml")
|
||||
if err != nil {
|
||||
if err := gp.LoadYaml("advanced/channel.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
gp.tryloadFromEnv()
|
||||
}
|
||||
|
||||
func (gp *BaseTable) GetConfigDir() string {
|
||||
return gp.configDir
|
||||
}
|
||||
|
||||
func (gp *BaseTable) LoadFromKVPair(kvPairs []*commonpb.KeyValuePair) error {
|
||||
for _, pair := range kvPairs {
|
||||
err := gp.Save(pair.Key, pair.Value)
|
||||
|
@ -215,15 +229,9 @@ func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []strin
|
|||
|
||||
func (gp *BaseTable) LoadYaml(fileName string) error {
|
||||
config := viper.New()
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
configFile := path.Dir(fpath) + "/../../../configs/" + fileName
|
||||
_, err := os.Stat(configFile)
|
||||
if os.IsNotExist(err) {
|
||||
runPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
configFile = runPath + "/configs/" + fileName
|
||||
configFile := gp.configDir + fileName
|
||||
if _, err := os.Stat(configFile); err != nil {
|
||||
panic("cannot access config file: " + configFile)
|
||||
}
|
||||
|
||||
config.SetConfigFile(configFile)
|
||||
|
@ -241,7 +249,7 @@ func (gp *BaseTable) LoadYaml(fileName string) error {
|
|||
for _, v := range val {
|
||||
ss, err := cast.ToStringE(v)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
panic(err)
|
||||
}
|
||||
if len(str) == 0 {
|
||||
str = ss
|
||||
|
@ -251,7 +259,7 @@ func (gp *BaseTable) LoadYaml(fileName string) error {
|
|||
}
|
||||
|
||||
default:
|
||||
log.Panicf("undefine config type, key=%s", key)
|
||||
panic("undefined config type, key=" + key)
|
||||
}
|
||||
}
|
||||
err = gp.params.Save(strings.ToLower(key), str)
|
||||
|
|
Loading…
Reference in New Issue