Delete pulsar address test and refactor master param table

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-11-23 18:04:09 +08:00 committed by yefu.chen
parent c7a49c97cc
commit 84f3d974fa
33 changed files with 942 additions and 652 deletions

View File

@ -17,41 +17,7 @@ func main() {
// Creates server.
ctx, cancel := context.WithCancel(context.Background())
etcdAddress := master.Params.EtcdAddress()
etcdRootPath := master.Params.EtcdRootPath()
pulsarAddr := master.Params.PulsarAddress()
defaultRecordSize := master.Params.DefaultRecordSize()
minimumAssignSize := master.Params.MinimumAssignSize()
segmentThreshold := master.Params.SegmentThreshold()
segmentExpireDuration := master.Params.SegmentExpireDuration()
numOfChannel := master.Params.TopicNum()
nodeNum, _ := master.Params.QueryNodeNum()
statsChannel := master.Params.StatsChannels()
opt := master.Option{
KVRootPath: etcdRootPath,
MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr,
ProxyIDs: master.Params.ProxyIDList(),
PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(),
PulsarProxySubName: master.Params.ProxyTimeSyncSubName(),
SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(),
WriteIDs: master.Params.WriteIDList(),
PulsarWriteChannels: master.Params.WriteTimeSyncChannels(),
PulsarWriteSubName: master.Params.WriteTimeSyncSubName(),
PulsarDMChannels: master.Params.DMTimeSyncChannels(),
PulsarK2SChannels: master.Params.K2STimeSyncChannels(),
DefaultRecordSize: defaultRecordSize,
MinimumAssignSize: minimumAssignSize,
SegmentThreshold: segmentThreshold,
SegmentExpireDuration: segmentExpireDuration,
NumOfChannel: numOfChannel,
NumOfQueryNode: nodeNum,
StatsChannels: statsChannel,
}
svr, err := master.CreateServer(ctx, &opt)
svr, err := master.CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
@ -69,7 +35,7 @@ func main() {
cancel()
}()
if err := svr.Run(int64(master.Params.Port())); err != nil {
if err := svr.Run(int64(master.Params.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}

View File

@ -15,6 +15,7 @@ master:
segment:
# old name: segmentThreshold: 536870912
size: 512 # MB
sizeFactor: 0.75
defaultSizePerRecord: 1024
minIDAssignCnt: 1024
maxIDAssignCnt: 16384

View File

@ -31,6 +31,7 @@ master:
minimumAssignSize: 1048576
segmentThreshold: 536870912
segmentExpireDuration: 2000
segmentThresholdFactor: 0.75
querynodenum: 1
writenodenum: 1
statsChannels: "statistic"

View File

@ -8,6 +8,7 @@ set(MILVUS_QUERY_SRCS
visitors/ShowExprVisitor.cpp
visitors/ExecExprVisitor.cpp
Plan.cpp
Search.cpp
)
add_library(milvus_query ${MILVUS_QUERY_SRCS})
target_link_libraries(milvus_query milvus_proto)

View File

@ -0,0 +1,107 @@
#include "Search.h"
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include "segcore/Reduce.h"
#include <faiss/utils/distances.h>
#include "utils/tools.h"
namespace milvus::query {
static faiss::ConcurrentBitsetPtr
create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk_id) {
if (!bitmaps_opt.has_value()) {
return nullptr;
}
auto& bitmaps = *bitmaps_opt.value();
auto& src_vec = bitmaps.at(chunk_id);
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
boost::to_block_range(src_vec, dst->mutable_data());
return dst;
}
using namespace segcore;
Status
QueryBruteForceImpl(const SegmentSmallIndex& segment,
const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmaps_opt,
QueryResult& results) {
auto& record = segment.get_insert_record();
auto& schema = segment.get_schema();
auto& indexing_record = segment.get_indexing_record();
// step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record, timestamp);
auto max_chunk = upper_div(ins_barrier, DefaultElementPerChunk);
// auto del_barrier = get_barrier(deleted_record_, timestamp);
#if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder);
auto bitmap = bitmap_holder->bitmap_ptr;
#endif
// step 2.1: get meta
// step 2.2: get which vector field to search
auto vecfield_offset_opt = schema.get_offset(info.field_id_);
Assert(vecfield_offset_opt.has_value());
auto vecfield_offset = vecfield_offset_opt.value();
auto& field = schema[vecfield_offset];
auto vec_ptr = record.get_vec_entity<float>(vecfield_offset);
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto topK = info.topK_;
auto total_count = topK * num_queries;
// TODO: optimize
// step 3: small indexing search
std::vector<int64_t> final_uids(total_count, -1);
std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());
auto max_indexed_id = indexing_record.get_finished_ack();
const auto& indexing_entry = indexing_record.get_indexing(vecfield_offset);
auto search_conf = indexing_entry.get_search_conf(topK);
for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
auto indexing = indexing_entry.get_indexing(chunk_id);
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto dataset = knowhere::GenDataset(num_queries, dim, src_data);
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
auto ans = indexing->Query(dataset, search_conf, bitmap_view);
auto dis = ans->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto uids = ans->Get<int64_t*>(milvus::knowhere::meta::IDS);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids);
}
// step 4: brute force search where small indexing is unavailable
for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) {
std::vector<int64_t> buf_uids(total_count, -1);
std::vector<float> buf_dis(total_count, std::numeric_limits<float>::max());
faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()};
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto nsize =
chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk;
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf, bitmap_view);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}
// step 5: convert offset to uids
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record.uids_[id];
}
results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;
return Status::OK();
}
} // namespace milvus::query

View File

@ -0,0 +1,19 @@
#pragma once
#include <optional>
#include "segcore/SegmentSmallIndex.h"
#include <boost/dynamic_bitset.hpp>
namespace milvus::query {
using BitmapChunk = boost::dynamic_bitset<>;
using BitmapSimple = std::deque<BitmapChunk>;
// note: c++17 don't support optional ref
Status
QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment,
const QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmap_opt,
segcore::QueryResult& results);
} // namespace milvus::query

View File

@ -4,6 +4,7 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "ExprVisitor.h"
namespace milvus::query {
@ -22,7 +23,7 @@ class ExecExprVisitor : ExprVisitor {
visit(RangeExpr& expr) override;
public:
using RetType = std::vector<std::vector<bool>>;
using RetType = std::deque<boost::dynamic_bitset<>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType

View File

@ -1,6 +1,7 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "query/generated/ExecExprVisitor.h"
namespace milvus::query {
@ -10,7 +11,7 @@ namespace milvus::query {
namespace impl {
class ExecExprVisitor : ExprVisitor {
public:
using RetType = std::vector<std::vector<bool>>;
using RetType = std::deque<boost::dynamic_bitset<>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType
@ -66,7 +67,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, Func func) -> RetT
auto& field_meta = schema[field_offset];
auto vec_ptr = records.get_scalar_entity<T>(field_offset);
auto& vec = *vec_ptr;
std::vector<std::vector<bool>> results(vec.chunk_size());
RetType results(vec.chunk_size());
for (auto chunk_id = 0; chunk_id < vec.chunk_size(); ++chunk_id) {
auto& result = results[chunk_id];
result.resize(segcore::DefaultElementPerChunk);

View File

@ -3,6 +3,8 @@
#include "segcore/SegmentBase.h"
#include "query/generated/ExecPlanNodeVisitor.h"
#include "segcore/SegmentSmallIndex.h"
#include "query/generated/ExecExprVisitor.h"
#include "query/Search.h"
namespace milvus::query {
@ -49,7 +51,12 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
auto& ph = placeholder_group_.at(0);
auto src_data = ph.get_blob<float>();
auto num_queries = ph.num_of_queries_;
segment->QueryBruteForceImpl(node.query_info_, src_data, num_queries, timestamp_, ret);
if (node.predicate_.has_value()) {
auto bitmap = ExecExprVisitor(*segment).call_child(*node.predicate_.value());
auto ptr = &bitmap;
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, ptr, ret);
}
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, std::nullopt, ret);
ret_ = ret;
}

View File

@ -37,6 +37,5 @@ class AckResponder {
std::shared_mutex mutex_;
std::set<int64_t> acks_ = {0};
std::atomic<int64_t> minimum_ = 0;
// std::atomic<int64_t> maximum_ = 0;
};
} // namespace milvus::segcore

View File

@ -4,6 +4,7 @@
#include "common/Schema.h"
#include "knowhere/index/vector_index/IndexIVF.h"
#include <memory>
#include "segcore/Record.h"
namespace milvus::segcore {

View File

@ -67,7 +67,7 @@ class IndexingRecord {
// concurrent
int64_t
get_finished_ack() {
get_finished_ack() const {
return finished_ack_.GetAck();
}

View File

@ -2,6 +2,7 @@
#include "common/Schema.h"
#include "ConcurrentVector.h"
#include "AckResponder.h"
#include "segcore/Record.h"
namespace milvus::segcore {
struct InsertRecord {

View File

@ -0,0 +1,21 @@
#pragma once
#include "common/Schema.h"
namespace milvus::segcore {
template <typename RecordType>
inline int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
} // namespace milvus::segcore

View File

@ -52,10 +52,7 @@ class SegmentBase {
virtual Status
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
// query contains metadata of
virtual Status
QueryDeprecated(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results) = 0;
public:
virtual Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],

View File

@ -219,23 +219,6 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_r
// return Status::OK();
}
template <typename RecordType>
int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
Status
SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
auto ins_barrier = get_barrier(record_, timestamp);

View File

@ -41,10 +41,12 @@ class SegmentNaive : public SegmentBase {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
// query contains metadata of
private:
// NOTE: now deprecated, remains for further copy out
Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results);
public:
Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],

View File

@ -204,137 +204,6 @@ SegmentSmallIndex::Delete(int64_t reserved_begin,
// return Status::OK();
}
template <typename RecordType>
int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
Status
SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
QueryResult& results) {
// step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record_, timestamp);
// auto del_barrier = get_barrier(deleted_record_, timestamp);
#if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder);
auto bitmap = bitmap_holder->bitmap_ptr;
#endif
// step 2.1: get meta
// step 2.2: get which vector field to search
auto vecfield_offset_opt = schema_->get_offset(info.field_id_);
Assert(vecfield_offset_opt.has_value());
auto vecfield_offset = vecfield_offset_opt.value();
Assert(vecfield_offset < record_.entity_vec_.size());
auto& field = schema_->operator[](vecfield_offset);
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_.at(vecfield_offset));
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto topK = info.topK_;
auto total_count = topK * num_queries;
// TODO: optimize
// step 3: small indexing search
std::vector<int64_t> final_uids(total_count, -1);
std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());
auto max_chunk = (ins_barrier + DefaultElementPerChunk - 1) / DefaultElementPerChunk;
auto max_indexed_id = indexing_record_.get_finished_ack();
const auto& indexing_entry = indexing_record_.get_indexing(vecfield_offset);
auto search_conf = indexing_entry.get_search_conf(topK);
for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
auto indexing = indexing_entry.get_indexing(chunk_id);
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto dataset = knowhere::GenDataset(num_queries, dim, src_data);
auto ans = indexing->Query(dataset, search_conf, nullptr);
auto dis = ans->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto uids = ans->Get<int64_t*>(milvus::knowhere::meta::IDS);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids);
}
// step 4: brute force search where small indexing is unavailable
for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) {
std::vector<int64_t> buf_uids(total_count, -1);
std::vector<float> buf_dis(total_count, std::numeric_limits<float>::max());
faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()};
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto nsize =
chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk;
faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}
// step 5: convert offset to uids
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record_.uids_[id];
}
results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;
// throw std::runtime_error("unimplemented");
return Status::OK();
}
Status
SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
// TODO: enable delete
// TODO: enable index
// TODO: remove mock
if (query_info == nullptr) {
query_info = std::make_shared<query::QueryDeprecated>();
query_info->field_name = "fakevec";
query_info->topK = 10;
query_info->num_queries = 1;
auto dim = schema_->operator[]("fakevec").get_dim();
std::default_random_engine e(42);
std::uniform_real_distribution<> dis(0.0, 1.0);
query_info->query_raw_data.resize(query_info->num_queries * dim);
for (auto& x : query_info->query_raw_data) {
x = dis(e);
}
}
// TODO
query::QueryInfo info{
query_info->topK,
query_info->field_name,
"L2",
nlohmann::json{
{"nprobe", 10},
},
};
auto num_queries = query_info->num_queries;
return QueryBruteForceImpl(info, query_info->query_raw_data.data(), num_queries, timestamp, result);
}
Status
SegmentSmallIndex::Close() {
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {

View File

@ -65,10 +65,6 @@ class SegmentSmallIndex : public SegmentBase {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
// query contains metadata of
Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
@ -112,6 +108,16 @@ class SegmentSmallIndex : public SegmentBase {
return record_;
}
const IndexingRecord&
get_indexing_record() const {
return indexing_record_;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
const Schema&
get_schema() const {
return *schema_;
@ -148,12 +154,12 @@ class SegmentSmallIndex : public SegmentBase {
// Status
// QueryBruteForceImpl(query::QueryPtr query, Timestamp timestamp, QueryResult& results);
Status
QueryBruteForceImpl(const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
QueryResult& results);
// Status
// QueryBruteForceImpl(const query::QueryInfo& info,
// const float* query_data,
// int64_t num_queries,
// Timestamp timestamp,
// QueryResult& results);
template <typename Type>
knowhere::IndexPtr
@ -172,4 +178,5 @@ class SegmentSmallIndex : public SegmentBase {
// std::unordered_map<std::string, knowhere::IndexPtr> indexings_; // index_name => indexing
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
};
} // namespace milvus::segcore

View File

@ -23,37 +23,44 @@ func TestMaster_CollectionTask(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr := Params.EtcdAddress()
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
Params = ParamTable{
Address: Params.Address,
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},
WriteNodeIDList: []typeutil.UniqueID{3, 4},
TopicNum: 5,
QueryNodeNum: 3,
SoftTimeTickBarrierInterval: 300,
// segment
SegmentSize: 536870912 / 1024 / 1024,
SegmentSizeFactor: 0.75,
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
MinSegIDAssignCnt: 1048576 / 1024,
MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt,
SegIDAssignExpiration: 2000,
// msgChannel
ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"},
WriteNodeTimeTickChannelNames: []string{"write3", "write4"},
InsertChannelNames: []string{"dm0", "dm1"},
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
}
svr, err := CreateServer(ctx, &opt)
svr, err := CreateServer(ctx)
assert.Nil(t, err)
err = svr.Run(10002)
assert.Nil(t, err)

View File

@ -15,7 +15,7 @@ var gTestIDAllocator *GlobalIDAllocator
func TestMain(m *testing.M) {
Params.Init()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso"))
gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid"))
exitCode := m.Run()

View File

@ -21,36 +21,45 @@ func TestMaster_CreateCollection(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
Params = ParamTable{
Address: Params.Address,
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},
WriteNodeIDList: []typeutil.UniqueID{3, 4},
TopicNum: 5,
QueryNodeNum: 3,
SoftTimeTickBarrierInterval: 300,
// segment
SegmentSize: 536870912 / 1024 / 1024,
SegmentSizeFactor: 0.75,
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
MinSegIDAssignCnt: 1048576 / 1024,
MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt,
SegIDAssignExpiration: 2000,
// msgChannel
ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"},
WriteNodeTimeTickChannelNames: []string{"write3", "write4"},
InsertChannelNames: []string{"dm0", "dm1"},
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
}
svr, err := CreateServer(ctx, &opt)
svr, err := CreateServer(ctx)
assert.Nil(t, err)
err = svr.Run(10001)
assert.Nil(t, err)

View File

@ -27,36 +27,6 @@ type (
Timestamp = typeutil.Timestamp
)
type Option struct {
KVRootPath string
MetaRootPath string
EtcdAddr []string
PulsarAddr string
////softTimeTickBarrier
ProxyIDs []typeutil.UniqueID
PulsarProxyChannels []string //TimeTick
PulsarProxySubName string
SoftTTBInterval Timestamp //Physical Time + Logical Time
//hardTimeTickBarrier
WriteIDs []typeutil.UniqueID
PulsarWriteChannels []string
PulsarWriteSubName string
PulsarDMChannels []string
PulsarK2SChannels []string
DefaultRecordSize int64
MinimumAssignSize int64
SegmentThreshold float64
SegmentExpireDuration int64
NumOfChannel int
NumOfQueryNode int
StatsChannels string
}
type Master struct {
// Server state.
isServing int64
@ -105,18 +75,22 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV {
func Init() {
rand.Seed(time.Now().UnixNano())
Params.InitParamTable()
Params.Init()
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
func CreateServer(ctx context.Context) (*Master, error) {
//Init(etcdAddr, kvRootPath)
etcdAddress := Params.EtcdAddress
metaRootPath := Params.EtcdRootPath
kvRootPath := Params.EtcdRootPath
pulsarAddr := Params.PulsarAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: opt.EtcdAddr})
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
etcdkv := kv.NewEtcdKV(etcdClient, opt.MetaRootPath)
etcdkv := kv.NewEtcdKV(etcdClient, metaRootPath)
metakv, err := NewMetaTable(etcdkv)
if err != nil {
return nil, err
@ -128,41 +102,41 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
return nil, err
}
pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyStream.SetPulsarClient(opt.PulsarAddr)
pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.SetPulsarClient(pulsarAddr)
pulsarProxyStream.CreatePulsarConsumers(Params.ProxyTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.Start()
var proxyStream ms.MsgStream = pulsarProxyStream
proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval)
proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval)
tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarClient(opt.PulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.SetPulsarClient(pulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs)
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList)
tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarClient(opt.PulsarAddr)
pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels)
pulsarDMStream.SetPulsarClient(pulsarAddr)
pulsarDMStream.CreatePulsarProducers(Params.InsertChannelNames)
tsMsgProducer.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarClient(opt.PulsarAddr)
pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels)
pulsarK2SStream.SetPulsarClient(pulsarAddr)
pulsarK2SStream.CreatePulsarProducers(Params.K2SChannelNames)
tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
// stats msg stream
statsMs := ms.NewPulsarMsgStream(ctx, 1024)
statsMs.SetPulsarClient(opt.PulsarAddr)
statsMs.CreatePulsarConsumers([]string{opt.StatsChannels}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024)
statsMs.SetPulsarClient(pulsarAddr)
statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024)
statsMs.Start()
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
kvBase: newKVBase(kvRootPath, []string{etcdAddress}),
metaTable: metakv,
timesSyncMsgProducer: tsMsgProducer,
grpcErr: make(chan error),
@ -170,19 +144,19 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
}
//init idAllocator
m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "gid"))
m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "gid"))
if err := m.idAllocator.Initialize(); err != nil {
return nil, err
}
//init tsoAllocator
m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase(opt.EtcdAddr, opt.KVRootPath, "tso"))
m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "tso"))
if err := m.tsoAllocator.Initialize(); err != nil {
return nil, err
}
m.scheduler = NewDDRequestScheduler(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
m.segmentMgr = NewSegmentManager(metakv, opt,
m.segmentMgr = NewSegmentManager(metakv,
func() (UniqueID, error) { return m.idAllocator.AllocOne() },
func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
)

View File

@ -15,7 +15,7 @@ import (
func TestMetaTable_Collection(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root")
@ -152,7 +152,7 @@ func TestMetaTable_Collection(t *testing.T) {
func TestMetaTable_DeletePartition(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
@ -243,7 +243,7 @@ func TestMetaTable_DeletePartition(t *testing.T) {
func TestMetaTable_Segment(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
@ -324,7 +324,7 @@ func TestMetaTable_Segment(t *testing.T) {
func TestMetaTable_UpdateSegment(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)

View File

@ -0,0 +1,398 @@
package master
import (
"log"
"strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
EtcdAddress string
EtcdRootPath string
PulsarAddress string
// nodeID
ProxyIDList []typeutil.UniqueID
WriteNodeIDList []typeutil.UniqueID
TopicNum int
QueryNodeNum int
SoftTimeTickBarrierInterval typeutil.Timestamp
// segment
SegmentSize float64
SegmentSizeFactor float64
DefaultRecordSize int64
MinSegIDAssignCnt int64
MaxSegIDAssignCnt int64
SegIDAssignExpiration int64
// msgChannel
ProxyTimeTickChannelNames []string
WriteNodeTimeTickChannelNames []string
InsertChannelNames []string
K2SChannelNames []string
QueryNodeStatsChannelName string
MsgChannelSubName string
}
var Params ParamTable
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()
err := p.LoadYaml("milvus.yaml")
if err != nil {
panic(err)
}
err = p.LoadYaml("advanced/channel.yaml")
if err != nil {
panic(err)
}
err = p.LoadYaml("advanced/master.yaml")
if err != nil {
panic(err)
}
// set members
p.initAddress()
p.initPort()
p.initEtcdAddress()
p.initEtcdRootPath()
p.initPulsarAddress()
p.initProxyIDList()
p.initWriteNodeIDList()
p.initTopicNum()
p.initQueryNodeNum()
p.initSoftTimeTickBarrierInterval()
p.initSegmentSize()
p.initSegmentSizeFactor()
p.initDefaultRecordSize()
p.initMinSegIDAssignCnt()
p.initMaxSegIDAssignCnt()
p.initSegIDAssignExpiration()
p.initProxyTimeTickChannelNames()
p.initWriteNodeTimeTickChannelNames()
p.initInsertChannelNames()
p.initK2SChannelNames()
p.initQueryNodeStatsChannelName()
p.initMsgChannelSubName()
}
func (p *ParamTable) initAddress() {
masterAddress, err := p.Load("master.address")
if err != nil {
panic(err)
}
p.Address = masterAddress
}
func (p *ParamTable) initPort() {
masterPort, err := p.Load("master.port")
if err != nil {
panic(err)
}
port, err := strconv.Atoi(masterPort)
if err != nil {
panic(err)
}
p.Port = port
}
func (p *ParamTable) initEtcdAddress() {
addr, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.EtcdAddress = addr
}
func (p *ParamTable) initPulsarAddress() {
addr, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *ParamTable) initEtcdRootPath() {
path, err := p.Load("etcd.rootpath")
if err != nil {
panic(err)
}
p.EtcdRootPath = path
}
func (p *ParamTable) initTopicNum() {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
channelRange := strings.Split(insertChannelRange, ",")
if len(channelRange) != 2 {
panic("Illegal channel range num")
}
channelBegin, err := strconv.Atoi(channelRange[0])
if err != nil {
panic(err)
}
channelEnd, err := strconv.Atoi(channelRange[1])
if err != nil {
panic(err)
}
if channelBegin < 0 || channelEnd < 0 {
panic("Illegal channel range value")
}
if channelBegin > channelEnd {
panic("Illegal channel range value")
}
p.TopicNum = channelEnd
}
func (p *ParamTable) initSegmentSize() {
threshold, err := p.Load("master.segment.size")
if err != nil {
panic(err)
}
segmentThreshold, err := strconv.ParseFloat(threshold, 64)
if err != nil {
panic(err)
}
p.SegmentSize = segmentThreshold
}
func (p *ParamTable) initSegmentSizeFactor() {
segFactor, err := p.Load("master.segment.sizeFactor")
if err != nil {
panic(err)
}
factor, err := strconv.ParseFloat(segFactor, 64)
if err != nil {
panic(err)
}
p.SegmentSizeFactor = factor
}
func (p *ParamTable) initDefaultRecordSize() {
size, err := p.Load("master.segment.defaultSizePerRecord")
if err != nil {
panic(err)
}
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
p.DefaultRecordSize = res
}
func (p *ParamTable) initMinSegIDAssignCnt() {
size, err := p.Load("master.segment.minIDAssignCnt")
if err != nil {
panic(err)
}
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
p.MinSegIDAssignCnt = res
}
func (p *ParamTable) initMaxSegIDAssignCnt() {
size, err := p.Load("master.segment.maxIDAssignCnt")
if err != nil {
panic(err)
}
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
p.MaxSegIDAssignCnt = res
}
func (p *ParamTable) initSegIDAssignExpiration() {
duration, err := p.Load("master.segment.IDAssignExpiration")
if err != nil {
panic(err)
}
res, err := strconv.ParseInt(duration, 10, 64)
if err != nil {
panic(err)
}
p.SegIDAssignExpiration = res
}
func (p *ParamTable) initQueryNodeNum() {
id, err := p.Load("nodeID.queryNodeIDList")
if err != nil {
panic(err)
}
ids := strings.Split(id, ",")
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
}
p.QueryNodeNum = len(ids)
}
func (p *ParamTable) initQueryNodeStatsChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
panic(err)
}
p.QueryNodeStatsChannelName = channels
}
func (p *ParamTable) initProxyIDList() {
id, err := p.Load("nodeID.proxyIDList")
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
idList := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idList = append(idList, typeutil.UniqueID(v))
}
p.ProxyIDList = idList
}
func (p *ParamTable) initProxyTimeTickChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.proxyTimeTick")
if err != nil {
log.Panic(err)
}
id, err := p.Load("nodeID.proxyIDList")
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.ProxyTimeTickChannelNames = channels
}
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix")
if err != nil {
log.Panic(err)
}
p.MsgChannelSubName = name
}
func (p *ParamTable) initSoftTimeTickBarrierInterval() {
t, err := p.Load("master.timeSync.softTimeTickBarrierInterval")
if err != nil {
log.Panic(err)
}
v, err := strconv.ParseInt(t, 10, 64)
if err != nil {
log.Panic(err)
}
p.SoftTimeTickBarrierInterval = tsoutil.ComposeTS(v, 0)
}
func (p *ParamTable) initWriteNodeIDList() {
id, err := p.Load("nodeID.writeNodeIDList")
if err != nil {
log.Panic(err)
}
ids := strings.Split(id, ",")
idlist := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idlist = append(idlist, typeutil.UniqueID(v))
}
p.WriteNodeIDList = idlist
}
func (p *ParamTable) initWriteNodeTimeTickChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
if err != nil {
log.Fatal(err)
}
id, err := p.Load("nodeID.writeNodeIDList")
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.WriteNodeTimeTickChannelNames = channels
}
func (p *ParamTable) initInsertChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.insert")
if err != nil {
log.Fatal(err)
}
id, err := p.Load("nodeID.queryNodeIDList")
if err != nil {
log.Panicf("load query node id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load query node id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.InsertChannelNames = channels
}
func (p *ParamTable) initK2SChannelNames() {
ch, err := p.Load("msgChannel.chanNamePrefix.k2s")
if err != nil {
log.Fatal(err)
}
id, err := p.Load("nodeID.writeNodeIDList")
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
channels := make([]string, 0, len(ids))
for _, i := range ids {
_, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
channels = append(channels, ch+"-"+i)
}
p.K2SChannelNames = channels
}

View File

@ -0,0 +1,143 @@
package master
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_Init(t *testing.T) {
Params.Init()
}
func TestParamTable_Address(t *testing.T) {
Params.Init()
address := Params.Address
assert.Equal(t, address, "localhost")
}
func TestParamTable_Port(t *testing.T) {
Params.Init()
port := Params.Port
assert.Equal(t, port, 53100)
}
func TestParamTable_EtcdRootPath(t *testing.T) {
Params.Init()
addr := Params.EtcdRootPath
assert.Equal(t, addr, "by-dev")
}
func TestParamTable_TopicNum(t *testing.T) {
Params.Init()
num := Params.TopicNum
assert.Equal(t, num, 15)
}
func TestParamTable_SegmentSize(t *testing.T) {
Params.Init()
size := Params.SegmentSize
assert.Equal(t, size, float64(512))
}
func TestParamTable_SegmentSizeFactor(t *testing.T) {
Params.Init()
factor := Params.SegmentSizeFactor
assert.Equal(t, factor, 0.75)
}
func TestParamTable_DefaultRecordSize(t *testing.T) {
Params.Init()
size := Params.DefaultRecordSize
assert.Equal(t, size, int64(1024))
}
func TestParamTable_MinSegIDAssignCnt(t *testing.T) {
Params.Init()
cnt := Params.MinSegIDAssignCnt
assert.Equal(t, cnt, int64(1024))
}
func TestParamTable_MaxSegIDAssignCnt(t *testing.T) {
Params.Init()
cnt := Params.MaxSegIDAssignCnt
assert.Equal(t, cnt, int64(16384))
}
func TestParamTable_SegIDAssignExpiration(t *testing.T) {
Params.Init()
expiration := Params.SegIDAssignExpiration
assert.Equal(t, expiration, int64(2000))
}
func TestParamTable_QueryNodeNum(t *testing.T) {
Params.Init()
num := Params.QueryNodeNum
assert.Equal(t, num, 2)
}
func TestParamTable_QueryNodeStatsChannelName(t *testing.T) {
Params.Init()
name := Params.QueryNodeStatsChannelName
assert.Equal(t, name, "query-node-stats")
}
func TestParamTable_ProxyIDList(t *testing.T) {
Params.Init()
ids := Params.ProxyIDList
assert.Equal(t, len(ids), 2)
assert.Equal(t, ids[0], int64(1))
assert.Equal(t, ids[1], int64(2))
}
func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) {
Params.Init()
names := Params.ProxyTimeTickChannelNames
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "proxyTimeTick-1")
assert.Equal(t, names[1], "proxyTimeTick-2")
}
func TestParamTable_MsgChannelSubName(t *testing.T) {
Params.Init()
name := Params.MsgChannelSubName
assert.Equal(t, name, "master")
}
func TestParamTable_SoftTimeTickBarrierInterval(t *testing.T) {
Params.Init()
interval := Params.SoftTimeTickBarrierInterval
assert.Equal(t, interval, Timestamp(0x7d00000))
}
func TestParamTable_WriteNodeIDList(t *testing.T) {
Params.Init()
ids := Params.WriteNodeIDList
assert.Equal(t, len(ids), 2)
assert.Equal(t, ids[0], int64(5))
assert.Equal(t, ids[1], int64(6))
}
func TestParamTable_WriteNodeTimeTickChannelNames(t *testing.T) {
Params.Init()
names := Params.WriteNodeTimeTickChannelNames
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "writeNodeTimeTick-5")
assert.Equal(t, names[1], "writeNodeTimeTick-6")
}
func TestParamTable_InsertChannelNames(t *testing.T) {
Params.Init()
names := Params.InsertChannelNames
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "insert-3")
assert.Equal(t, names[1], "insert-4")
}
func TestParamTable_K2SChannelNames(t *testing.T) {
Params.Init()
names := Params.K2SChannelNames
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "k2s-5")
assert.Equal(t, names[1], "k2s-6")
}

View File

@ -1,216 +0,0 @@
package master
import (
"log"
"strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type ParamTable struct {
paramtable.BaseTable
}
var Params ParamTable
func (p *ParamTable) InitParamTable() {
p.Init()
}
func (p *ParamTable) Address() string {
masterAddress, _ := p.Load("master.address")
return masterAddress
}
func (p *ParamTable) Port() int {
masterPort, _ := p.Load("master.port")
port, err := strconv.Atoi(masterPort)
if err != nil {
panic(err)
}
return port
}
func (p *ParamTable) PulsarToic() string {
pulsarTopic, _ := p.Load("master.pulsartopic")
return pulsarTopic
}
func (p *ParamTable) SegmentThreshold() float64 {
threshold, _ := p.Load("master.segmentThreshold")
segmentThreshold, err := strconv.ParseFloat(threshold, 32)
if err != nil {
panic(err)
}
return segmentThreshold
}
func (p *ParamTable) DefaultRecordSize() int64 {
size, _ := p.Load("master.defaultSizePerRecord")
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) MinimumAssignSize() int64 {
size, _ := p.Load("master.minimumAssignSize")
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) SegmentExpireDuration() int64 {
duration, _ := p.Load("master.segmentExpireDuration")
res, err := strconv.ParseInt(duration, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) QueryNodeNum() (int, error) {
num, _ := p.Load("master.querynodenum")
return strconv.Atoi(num)
}
func (p *ParamTable) StatsChannels() string {
channels, _ := p.Load("master.statsChannels")
return channels
}
func (p *ParamTable) ProxyIDList() []typeutil.UniqueID {
id, err := p.Load("master.proxyidlist")
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ids := strings.Split(id, ",")
idlist := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idlist = append(idlist, typeutil.UniqueID(v))
}
return idlist
}
func (p *ParamTable) ProxyTimeSyncChannels() []string {
chs, err := p.Load("master.proxyTimeSyncChannels")
if err != nil {
log.Panic(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) ProxyTimeSyncSubName() string {
name, err := p.Load("master.proxyTimeSyncSubName")
if err != nil {
log.Panic(err)
}
return name
}
func (p *ParamTable) SoftTimeTickBarrierInterval() typeutil.Timestamp {
t, err := p.Load("master.softTimeTickBarrierInterval")
if err != nil {
log.Panic(err)
}
v, err := strconv.ParseInt(t, 10, 64)
if err != nil {
log.Panic(err)
}
return tsoutil.ComposeTS(v, 0)
}
func (p *ParamTable) WriteIDList() []typeutil.UniqueID {
id, err := p.Load("master.writeidlist")
if err != nil {
log.Panic(err)
}
ids := strings.Split(id, ",")
idlist := make([]typeutil.UniqueID, 0, len(ids))
for _, i := range ids {
v, err := strconv.ParseInt(i, 10, 64)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
idlist = append(idlist, typeutil.UniqueID(v))
}
return idlist
}
func (p *ParamTable) WriteTimeSyncChannels() []string {
chs, err := p.Load("master.writeTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) WriteTimeSyncSubName() string {
name, err := p.Load("master.writeTimeSyncSubName")
if err != nil {
log.Fatal(err)
}
return name
}
func (p *ParamTable) DMTimeSyncChannels() []string {
chs, err := p.Load("master.dmTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) K2STimeSyncChannels() []string {
chs, err := p.Load("master.k2sTimeSyncChannels")
if err != nil {
log.Fatal(err)
}
return strings.Split(chs, ",")
}
func (p *ParamTable) PulsarAddress() string {
pulsarAddress, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return pulsarAddress
}
func (p *ParamTable) EtcdAddress() string {
etcdAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
return etcdAddress
}
func (p *ParamTable) EtcdRootPath() string {
etcdRootPath, err := p.Load("etcd.rootpath")
if err != nil {
panic(err)
}
return etcdRootPath
}
func (p *ParamTable) TopicNum() int {
topicNum, err := p.Load("pulsar.topicnum")
if err != nil {
panic(err)
}
num, err := strconv.Atoi(topicNum)
if err != nil {
panic(err)
}
return num
}

View File

@ -24,37 +24,46 @@ func TestMaster_Partition(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
Params = ParamTable{
Address: Params.Address,
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root",
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},
WriteNodeIDList: []typeutil.UniqueID{3, 4},
TopicNum: 5,
QueryNodeNum: 3,
SoftTimeTickBarrierInterval: 300,
// segment
SegmentSize: 536870912 / 1024 / 1024,
SegmentSizeFactor: 0.75,
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
MinSegIDAssignCnt: 1048576 / 1024,
MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt,
SegIDAssignExpiration: 2000,
// msgChannel
ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"},
WriteNodeTimeTickChannelNames: []string{"write3", "write4"},
InsertChannelNames: []string{"dm0", "dm1"},
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
}
port := 10000 + rand.Intn(1000)
svr, err := CreateServer(ctx, &opt)
svr, err := CreateServer(ctx)
assert.Nil(t, err)
err = svr.Run(int64(port))
assert.Nil(t, err)

View File

@ -29,20 +29,21 @@ type segmentStatus struct {
}
type SegmentManager struct {
metaTable *metaTable
statsStream msgstream.MsgStream
channelRanges []*channelRange
segmentStatus map[UniqueID]*segmentStatus // segment id to segment status
collStatus map[UniqueID]*collectionStatus // collection id to collection status
defaultSizePerRecord int64
minimumAssignSize int64
segmentThreshold int64
segmentExpireDuration int64
numOfChannels int
numOfQueryNodes int
globalIDAllocator func() (UniqueID, error)
globalTSOAllocator func() (Timestamp, error)
mu sync.RWMutex
metaTable *metaTable
statsStream msgstream.MsgStream
channelRanges []*channelRange
segmentStatus map[UniqueID]*segmentStatus // segment id to segment status
collStatus map[UniqueID]*collectionStatus // collection id to collection status
defaultSizePerRecord int64
minimumAssignSize int64
segmentThreshold float64
segmentThresholdFactor float64
segmentExpireDuration int64
numOfChannels int
numOfQueryNodes int
globalIDAllocator func() (UniqueID, error)
globalTSOAllocator func() (Timestamp, error)
mu sync.RWMutex
}
func (segMgr *SegmentManager) HandleQueryNodeMsgPack(msgPack *msgstream.MsgPack) error {
@ -76,7 +77,7 @@ func (segMgr *SegmentManager) handleSegmentStat(segStats *internalpb.SegmentStat
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize
if segStats.MemorySize > segMgr.segmentThreshold {
if segStats.MemorySize > int64(segMgr.segmentThresholdFactor*segMgr.segmentThreshold) {
return segMgr.closeSegment(segMeta)
}
return segMgr.metaTable.UpdateSegment(segMeta)
@ -150,6 +151,7 @@ func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDReques
func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, partitionTag string, count uint32, channelID int32,
collStatus *collectionStatus) (*internalpb.SegIDAssignment, error) {
segmentThreshold := int64(segMgr.segmentThreshold)
for _, segID := range collStatus.openedSegments {
segMeta, _ := segMgr.metaTable.GetSegmentByID(segID)
if segMeta.GetCloseTime() != 0 || channelID < segMeta.GetChannelStart() ||
@ -160,8 +162,8 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa
assignedMem := segMgr.checkAssignedSegExpire(segID)
memSize := segMeta.MemSize
neededMemSize := segMgr.calNeededSize(memSize, segMeta.NumRows, int64(count))
if memSize+assignedMem+neededMemSize <= segMgr.segmentThreshold {
remainingSize := segMgr.segmentThreshold - memSize - assignedMem
if memSize+assignedMem+neededMemSize <= segmentThreshold {
remainingSize := segmentThreshold - memSize - assignedMem
allocMemSize := segMgr.calAllocMemSize(neededMemSize, remainingSize)
segMgr.addAssignment(segID, allocMemSize)
return &internalpb.SegIDAssignment{
@ -174,7 +176,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa
}
}
neededMemSize := segMgr.defaultSizePerRecord * int64(count)
if neededMemSize > segMgr.segmentThreshold {
if neededMemSize > segmentThreshold {
return nil, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
count, neededMemSize)
}
@ -184,7 +186,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa
return nil, err
}
allocMemSize := segMgr.calAllocMemSize(neededMemSize, segMgr.segmentThreshold)
allocMemSize := segMgr.calAllocMemSize(neededMemSize, segmentThreshold)
segMgr.addAssignment(segMeta.SegmentID, allocMemSize)
return &internalpb.SegIDAssignment{
SegID: segMeta.SegmentID,
@ -322,23 +324,23 @@ func (segMgr *SegmentManager) createChannelRanges() error {
}
func NewSegmentManager(meta *metaTable,
opt *Option,
globalIDAllocator func() (UniqueID, error),
globalTSOAllocator func() (Timestamp, error),
) *SegmentManager {
segMgr := &SegmentManager{
metaTable: meta,
channelRanges: make([]*channelRange, 0),
segmentStatus: make(map[UniqueID]*segmentStatus),
collStatus: make(map[UniqueID]*collectionStatus),
segmentThreshold: int64(opt.SegmentThreshold),
segmentExpireDuration: opt.SegmentExpireDuration,
minimumAssignSize: opt.MinimumAssignSize,
defaultSizePerRecord: opt.DefaultRecordSize,
numOfChannels: opt.NumOfChannel,
numOfQueryNodes: opt.NumOfQueryNode,
globalIDAllocator: globalIDAllocator,
globalTSOAllocator: globalTSOAllocator,
metaTable: meta,
channelRanges: make([]*channelRange, 0),
segmentStatus: make(map[UniqueID]*segmentStatus),
collStatus: make(map[UniqueID]*collectionStatus),
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
segmentExpireDuration: Params.SegIDAssignExpiration,
minimumAssignSize: Params.MinSegIDAssignCnt * Params.DefaultRecordSize,
defaultSizePerRecord: Params.DefaultRecordSize,
numOfChannels: Params.TopicNum,
numOfQueryNodes: Params.QueryNodeNum,
globalIDAllocator: globalIDAllocator,
globalTSOAllocator: globalTSOAllocator,
}
segMgr.createChannelRanges()
return segMgr

View File

@ -7,22 +7,21 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"google.golang.org/grpc"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
var mt *metaTable
@ -36,7 +35,7 @@ var masterCancelFunc context.CancelFunc
func setup() {
Params.Init()
etcdAddress := Params.EtcdAddress()
etcdAddress := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
@ -77,18 +76,18 @@ func setup() {
if err != nil {
panic(err)
}
opt := &Option{
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
MinimumAssignSize: 1048576,
DefaultRecordSize: 1024,
NumOfQueryNode: 3,
NumOfChannel: 5,
}
var cnt int64
segMgr = NewSegmentManager(mt, opt,
Params.TopicNum = 5
Params.QueryNodeNum = 3
Params.SegmentSize = 536870912 / 1024 / 1024
Params.SegmentSizeFactor = 0.75
Params.DefaultRecordSize = 1024
Params.MinSegIDAssignCnt = 1048576 / 1024
Params.SegIDAssignExpiration = 2000
segMgr = NewSegmentManager(mt,
func() (UniqueID, error) {
val := atomic.AddInt64(&cnt, 1)
return val, nil
@ -209,7 +208,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) {
// close segment
stats.SegStats[0].NumRows = 600000
stats.SegStats[0].MemorySize = 600000000
stats.SegStats[0].MemorySize = int64(0.8 * segMgr.segmentThreshold)
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
assert.Nil(t, err)
segMeta, _ = mt.GetSegmentByID(100)
@ -219,7 +218,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) {
func startupMaster() {
Params.Init()
etcdAddress := Params.EtcdAddress()
etcdAddress := Params.EtcdAddress
rootPath := "/test/root"
ctx, cancel := context.WithCancel(context.TODO())
masterCancelFunc = cancel
@ -231,32 +230,40 @@ func startupMaster() {
if err != nil {
panic(err)
}
pulsarAddress := Params.PulsarAddress()
opt := &Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddress,
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
Params = ParamTable{
Address: Params.Address,
Port: Params.Port,
EtcdAddress: Params.EtcdAddress,
EtcdRootPath: rootPath,
PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2},
WriteNodeIDList: []typeutil.UniqueID{3, 4},
TopicNum: 5,
QueryNodeNum: 3,
SoftTimeTickBarrierInterval: 300,
// segment
SegmentSize: 536870912 / 1024 / 1024,
SegmentSizeFactor: 0.75,
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
MinSegIDAssignCnt: 1048576 / 1024,
MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt,
SegIDAssignExpiration: 2000,
// msgChannel
ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"},
WriteNodeTimeTickChannelNames: []string{"write3", "write4"},
InsertChannelNames: []string{"dm0", "dm1"},
K2SChannelNames: []string{"k2s0", "k2s1"},
QueryNodeStatsChannelName: "statistic",
MsgChannelSubName: Params.MsgChannelSubName,
}
master, err = CreateServer(ctx, opt)
master, err = CreateServer(ctx)
if err != nil {
panic(err)
}
@ -327,7 +334,7 @@ func TestSegmentManager_RPC(t *testing.T) {
// test stats
segID := assignments[0].SegID
pulsarAddress := Params.PulsarAddress()
pulsarAddress := Params.PulsarAddress
ms := msgstream.NewPulsarMsgStream(ctx, 1024)
ms.SetPulsarClient(pulsarAddress)
ms.CreatePulsarProducers([]string{"statistic"})

View File

@ -80,7 +80,7 @@ func receiveMsg(stream *ms.MsgStream) []uint64 {
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
Init()
pulsarAddress := Params.PulsarAddress()
pulsarAddress := Params.PulsarAddress
producerChannels := []string{"proxyTtBarrier"}
consumerChannels := []string{"proxyTtBarrier"}

View File

@ -68,7 +68,7 @@ func getEmptyMsgPack() *ms.MsgPack {
func producer(channels []string, ttmsgs [][2]int) (*ms.MsgStream, *ms.MsgStream) {
Init()
pulsarAddress := Params.PulsarAddress()
pulsarAddress := Params.PulsarAddress
consumerSubName := "subTimetick"
producerChannels := channels
consumerChannels := channels

View File

@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
"path"
"strconv"
"sync"
"testing"
@ -20,7 +19,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
@ -38,10 +36,8 @@ var testNum = 10
func startMaster(ctx context.Context) {
master.Init()
etcdAddr := master.Params.EtcdAddress()
rootPath := master.Params.EtcdRootPath()
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
etcdAddr := master.Params.EtcdAddress
rootPath := master.Params.EtcdRootPath
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
@ -52,35 +48,12 @@ func startMaster(ctx context.Context) {
panic(err)
}
opt := master.Option{
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
EtcdAddr: []string{etcdAddr},
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
}
svr, err := master.CreateServer(ctx, &opt)
svr, err := master.CreateServer(ctx)
masterServer = svr
if err != nil {
log.Print("create server failed", zap.Error(err))
}
if err := svr.Run(int64(master.Params.Port())); err != nil {
if err := svr.Run(int64(master.Params.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}