mirror of https://github.com/milvus-io/milvus.git
Add timeout logic to task in Proxy
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
72ed65d7f8
commit
b32e55d5f0
|
@ -27,8 +27,8 @@ func main() {
|
|||
sig = <-sc
|
||||
cancel()
|
||||
}()
|
||||
pulsarAddress, _ := reader.Params.PulsarAddress()
|
||||
reader.StartQueryNode(ctx, pulsarAddress)
|
||||
|
||||
reader.StartQueryNode(ctx)
|
||||
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
|
|
|
@ -75,6 +75,9 @@ struct TermExpr : Expr {
|
|||
struct RangeExpr : Expr {
|
||||
FieldId field_id_;
|
||||
DataType data_type_;
|
||||
enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual };
|
||||
static const std::map<std::string, OpType> mapping_; // op_name -> op
|
||||
|
||||
// std::vector<std::tuple<OpType, std::any>> conditions_;
|
||||
protected:
|
||||
// prevent accidential instantiation
|
||||
|
|
|
@ -9,7 +9,6 @@ struct TermExprImpl : TermExpr {
|
|||
|
||||
template <typename T>
|
||||
struct RangeExprImpl : RangeExpr {
|
||||
enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual };
|
||||
std::vector<std::tuple<OpType, T>> conditions_;
|
||||
};
|
||||
|
||||
|
|
|
@ -3,93 +3,128 @@
|
|||
#include "PlanNode.h"
|
||||
#include "utils/EasyAssert.h"
|
||||
#include "pb/service_msg.pb.h"
|
||||
#include "ExprImpl.h"
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <boost/align/aligned_allocator.hpp>
|
||||
|
||||
namespace milvus::query {
|
||||
|
||||
static std::unique_ptr<VectorPlanNode>
|
||||
CreateVecNode(const std::string& field_name, const Json& vec_info) {
|
||||
ParseVecNode(Plan* plan, const Json& out_body) {
|
||||
// TODO add binary info
|
||||
auto vec_node = std::make_unique<FloatVectorANNS>();
|
||||
Assert(out_body.size() == 1);
|
||||
auto iter = out_body.begin();
|
||||
std::string field_name = iter.key();
|
||||
auto& vec_info = iter.value();
|
||||
auto topK = vec_info["topk"];
|
||||
vec_node->query_info_.topK_ = topK;
|
||||
vec_node->query_info_.metric_type_ = vec_info["metric_type"];
|
||||
vec_node->query_info_.search_params_ = vec_info["params"];
|
||||
vec_node->query_info_.field_id_ = field_name;
|
||||
vec_node->placeholder_tag_ = vec_info["query"];
|
||||
auto tag = vec_node->placeholder_tag_;
|
||||
AssertInfo(!plan->tag2field_.count(tag), "duplicated placeholder tag");
|
||||
plan->tag2field_.emplace(tag, field_name);
|
||||
return vec_node;
|
||||
}
|
||||
|
||||
/// initialize RangeExpr::mapping_
|
||||
const std::map<std::string, RangeExpr::OpType> RangeExpr::mapping_ = {
|
||||
{"lt", OpType::LessThan}, {"le", OpType::LessEqual}, {"lte", OpType::LessEqual},
|
||||
{"gt", OpType::GreaterThan}, {"ge", OpType::GreaterEqual}, {"gte", OpType::GreaterEqual},
|
||||
{"eq", OpType::Equal}, {"ne", OpType::NotEqual},
|
||||
};
|
||||
|
||||
static inline std::string
|
||||
to_lower(const std::string& raw) {
|
||||
auto data = raw;
|
||||
std::transform(data.begin(), data.end(), data.begin(), [](unsigned char c) { return std::tolower(c); });
|
||||
return data;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<Expr>
|
||||
ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
|
||||
auto expr = std::make_unique<RangeExprImpl<T>>();
|
||||
auto data_type = schema[field_name].get_data_type();
|
||||
expr->data_type_ = data_type;
|
||||
expr->field_id_ = field_name;
|
||||
for (auto& item : body.items()) {
|
||||
auto& op_name = item.key();
|
||||
auto op = RangeExpr::mapping_.at(to_lower(op_name));
|
||||
T value = item.value();
|
||||
expr->conditions_.emplace_back(op, value);
|
||||
}
|
||||
return expr;
|
||||
}
|
||||
|
||||
std::unique_ptr<Expr>
|
||||
ParseRangeNode(const Schema& schema, const Json& out_body) {
|
||||
Assert(out_body.size() == 1);
|
||||
auto out_iter = out_body.begin();
|
||||
auto field_name = out_iter.key();
|
||||
auto body = out_iter.value();
|
||||
auto data_type = schema[field_name].get_data_type();
|
||||
Assert(!field_is_vector(data_type));
|
||||
switch (data_type) {
|
||||
case DataType::BOOL:
|
||||
return ParseRangeNodeImpl<bool>(schema, field_name, body);
|
||||
case DataType::INT8:
|
||||
return ParseRangeNodeImpl<int8_t>(schema, field_name, body);
|
||||
case DataType::INT16:
|
||||
return ParseRangeNodeImpl<int16_t>(schema, field_name, body);
|
||||
case DataType::INT32:
|
||||
return ParseRangeNodeImpl<int32_t>(schema, field_name, body);
|
||||
case DataType::INT64:
|
||||
return ParseRangeNodeImpl<int64_t>(schema, field_name, body);
|
||||
case DataType::FLOAT:
|
||||
return ParseRangeNodeImpl<float>(schema, field_name, body);
|
||||
case DataType::DOUBLE:
|
||||
return ParseRangeNodeImpl<double>(schema, field_name, body);
|
||||
default:
|
||||
PanicInfo("unsupported");
|
||||
}
|
||||
}
|
||||
|
||||
static std::unique_ptr<Plan>
|
||||
CreatePlanImplNaive(const std::string& dsl_str) {
|
||||
auto plan = std::make_unique<Plan>();
|
||||
CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
|
||||
auto plan = std::make_unique<Plan>(schema);
|
||||
auto dsl = nlohmann::json::parse(dsl_str);
|
||||
nlohmann::json vec_pack;
|
||||
std::optional<std::unique_ptr<Expr>> predicate;
|
||||
|
||||
auto& bool_dsl = dsl["bool"];
|
||||
if (bool_dsl.contains("must")) {
|
||||
auto& packs = bool_dsl["must"];
|
||||
for (auto& pack : packs) {
|
||||
if (pack.contains("vector")) {
|
||||
auto iter = pack["vector"].begin();
|
||||
auto key = iter.key();
|
||||
auto& body = iter.value();
|
||||
plan->plan_node_ = CreateVecNode(key, body);
|
||||
return plan;
|
||||
auto& out_body = pack["vector"];
|
||||
plan->plan_node_ = ParseVecNode(plan.get(), out_body);
|
||||
} else if (pack.contains("range")) {
|
||||
AssertInfo(!predicate, "unsupported complex DSL");
|
||||
auto& out_body = pack["range"];
|
||||
predicate = ParseRangeNode(schema, out_body);
|
||||
} else {
|
||||
PanicInfo("unsupported node");
|
||||
}
|
||||
}
|
||||
PanicInfo("Unsupported DSL: vector node not detected");
|
||||
AssertInfo(plan->plan_node_, "vector node not found");
|
||||
} else if (bool_dsl.contains("vector")) {
|
||||
auto iter = bool_dsl["vector"].begin();
|
||||
auto key = iter.key();
|
||||
auto& body = iter.value();
|
||||
plan->plan_node_ = CreateVecNode(key, body);
|
||||
return plan;
|
||||
auto& out_body = bool_dsl["vector"];
|
||||
plan->plan_node_ = ParseVecNode(plan.get(), out_body);
|
||||
Assert(plan->plan_node_);
|
||||
} else {
|
||||
PanicInfo("Unsupported DSL: vector node not detected");
|
||||
PanicInfo("Unsupported DSL");
|
||||
}
|
||||
plan->plan_node_->predicate_ = std::move(predicate);
|
||||
return plan;
|
||||
}
|
||||
|
||||
static std::unique_ptr<Plan>
|
||||
CreateRangeNode(const Json& range_group) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
CheckNull(const Json& json) {
|
||||
Assert(!json.is_null());
|
||||
}
|
||||
|
||||
class PlanParser {
|
||||
public:
|
||||
void
|
||||
ParseBoolBody(const Json& dsl) {
|
||||
CheckNull(dsl);
|
||||
for (const auto& item : dsl.items()) {
|
||||
PanicInfo("unimplemented");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CreatePlanImpl(const Json& dsl) {
|
||||
if (dsl.empty()) {
|
||||
PanicInfo("DSL Is Empty or Invalid");
|
||||
}
|
||||
if (!dsl.contains("bool")) {
|
||||
auto bool_dsl = dsl["bool"];
|
||||
ParseBoolBody(bool_dsl);
|
||||
}
|
||||
PanicInfo("unimplemented");
|
||||
}
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
std::unique_ptr<Plan>
|
||||
CreatePlan(const Schema& schema, const std::string& dsl_str) {
|
||||
(void)schema;
|
||||
auto plan = CreatePlanImplNaive(dsl_str);
|
||||
auto plan = CreatePlanImplNaive(schema, dsl_str);
|
||||
return plan;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,14 @@ using Json = nlohmann::json;
|
|||
|
||||
// class definitions
|
||||
struct Plan {
|
||||
public:
|
||||
Plan(const Schema& schema) : schema_(schema) {
|
||||
}
|
||||
|
||||
public:
|
||||
const Schema& schema_;
|
||||
std::unique_ptr<VectorPlanNode> plan_node_;
|
||||
std::map<std::string, FieldId> tag2field_; // PlaceholderName -> FieldId
|
||||
// TODO: add move extra info
|
||||
};
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ struct QueryInfo {
|
|||
struct VectorPlanNode : PlanNode {
|
||||
std::optional<ExprPtr> predicate_;
|
||||
QueryInfo query_info_;
|
||||
std::string placeholder_tag_;
|
||||
|
||||
public:
|
||||
virtual void
|
||||
|
@ -44,16 +45,12 @@ struct VectorPlanNode : PlanNode {
|
|||
};
|
||||
|
||||
struct FloatVectorANNS : VectorPlanNode {
|
||||
std::string placeholder_tag_;
|
||||
|
||||
public:
|
||||
void
|
||||
accept(PlanNodeVisitor&) override;
|
||||
};
|
||||
|
||||
struct BinaryVectorANNS : VectorPlanNode {
|
||||
std::string placeholder_tag_;
|
||||
|
||||
public:
|
||||
void
|
||||
accept(PlanNodeVisitor&) override;
|
||||
|
|
|
@ -132,10 +132,16 @@ ShowExprVisitor::visit(TermExpr& expr) {
|
|||
|
||||
template <typename T>
|
||||
static Json
|
||||
CondtionExtract(const RangeExpr& expr_raw) {
|
||||
auto expr = dynamic_cast<const TermExprImpl<T>*>(&expr_raw);
|
||||
ConditionExtract(const RangeExpr& expr_raw) {
|
||||
auto expr = dynamic_cast<const RangeExprImpl<T>*>(&expr_raw);
|
||||
Assert(expr);
|
||||
return Json{expr->terms_};
|
||||
std::map<std::string, T> mapping;
|
||||
for (auto [op, v] : expr->conditions_) {
|
||||
// TODO: use name
|
||||
auto op_name = "op(" + std::to_string((int)op) + ")";
|
||||
mapping[op_name] = v;
|
||||
}
|
||||
return mapping;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -145,19 +151,19 @@ ShowExprVisitor::visit(RangeExpr& expr) {
|
|||
auto conditions = [&] {
|
||||
switch (expr.data_type_) {
|
||||
case DataType::BOOL:
|
||||
return CondtionExtract<bool>(expr);
|
||||
return ConditionExtract<bool>(expr);
|
||||
case DataType::INT8:
|
||||
return CondtionExtract<int8_t>(expr);
|
||||
return ConditionExtract<int8_t>(expr);
|
||||
case DataType::INT16:
|
||||
return CondtionExtract<int16_t>(expr);
|
||||
return ConditionExtract<int16_t>(expr);
|
||||
case DataType::INT32:
|
||||
return CondtionExtract<int32_t>(expr);
|
||||
return ConditionExtract<int32_t>(expr);
|
||||
case DataType::INT64:
|
||||
return CondtionExtract<int64_t>(expr);
|
||||
return ConditionExtract<int64_t>(expr);
|
||||
case DataType::DOUBLE:
|
||||
return CondtionExtract<double>(expr);
|
||||
return ConditionExtract<double>(expr);
|
||||
case DataType::FLOAT:
|
||||
return CondtionExtract<float>(expr);
|
||||
return ConditionExtract<float>(expr);
|
||||
default:
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
|
@ -167,5 +173,6 @@ ShowExprVisitor::visit(RangeExpr& expr) {
|
|||
{"field_id", expr.field_id_},
|
||||
{"data_type", datatype_name(expr.data_type_)},
|
||||
{"conditions", std::move(conditions)}};
|
||||
ret_ = res;
|
||||
}
|
||||
} // namespace milvus::query
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
#include <optional>
|
||||
|
||||
#include "query/generated/ShowPlanNodeVisitor.h"
|
||||
#include "query/generated/ShowExprVisitor.h"
|
||||
|
||||
namespace milvus::query {
|
||||
#if 0
|
||||
|
@ -49,7 +50,9 @@ ShowPlanNodeVisitor::visit(FloatVectorANNS& node) {
|
|||
{"placeholder_tag", node.placeholder_tag_}, //
|
||||
};
|
||||
if (node.predicate_.has_value()) {
|
||||
PanicInfo("unimplemented");
|
||||
ShowExprVisitor expr_show;
|
||||
Assert(node.predicate_.value());
|
||||
json_body["predicate"] = expr_show.call_child(node.predicate_->operator*());
|
||||
} else {
|
||||
json_body["predicate"] = "None";
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include "query/generated/PlanNodeVisitor.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "query/generated/ShowPlanNodeVisitor.h"
|
||||
#include "query/Plan.h"
|
||||
using namespace milvus;
|
||||
|
||||
TEST(Expr, Naive) {
|
||||
|
@ -49,13 +50,55 @@ TEST(Expr, Naive) {
|
|||
})";
|
||||
}
|
||||
|
||||
TEST(Expr, Range) {
|
||||
SUCCEED();
|
||||
using namespace milvus;
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
std::string dsl_string = R"(
|
||||
{
|
||||
"bool": {
|
||||
"must": [
|
||||
{
|
||||
"range": {
|
||||
"age": {
|
||||
"GT": 1,
|
||||
"LT": 100
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"vector": {
|
||||
"fakevec": {
|
||||
"metric_type": "L2",
|
||||
"params": {
|
||||
"nprobe": 10
|
||||
},
|
||||
"query": "$0",
|
||||
"topk": 10
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
})";
|
||||
auto schema = std::make_shared<Schema>();
|
||||
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
|
||||
schema->AddField("age", DataType::INT32);
|
||||
auto plan = CreatePlan(*schema, dsl_string);
|
||||
ShowPlanNodeVisitor shower;
|
||||
Assert(plan->tag2field_.at("$0") == "fakevec");
|
||||
auto out = shower.call_child(*plan->plan_node_);
|
||||
std::cout << out.dump(4);
|
||||
}
|
||||
|
||||
TEST(Expr, ShowExecutor) {
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
auto node = std::make_unique<FloatVectorANNS>();
|
||||
auto schema = std::make_shared<Schema>();
|
||||
int64_t num_queries = 100L;
|
||||
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
|
||||
int64_t num_queries = 100L;
|
||||
auto raw_data = DataGen(schema, num_queries);
|
||||
auto& info = node->query_info_;
|
||||
info.metric_type_ = "L2";
|
||||
|
@ -68,4 +111,4 @@ TEST(Expr, ShowExecutor) {
|
|||
auto dup = res;
|
||||
dup["data"] = "...collased...";
|
||||
std::cout << dup.dump(4);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ func (t *describeCollectionTask) Ts() (Timestamp, error) {
|
|||
if t.req == nil {
|
||||
return 0, errors.New("null request")
|
||||
}
|
||||
return Timestamp(t.req.Timestamp), nil
|
||||
return t.req.Timestamp, nil
|
||||
}
|
||||
|
||||
func (t *describeCollectionTask) Execute() error {
|
||||
|
@ -199,7 +199,7 @@ func (t *showCollectionsTask) Ts() (Timestamp, error) {
|
|||
if t.req == nil {
|
||||
return 0, errors.New("null request")
|
||||
}
|
||||
return Timestamp(t.req.Timestamp), nil
|
||||
return t.req.Timestamp, nil
|
||||
}
|
||||
|
||||
func (t *showCollectionsTask) Execute() error {
|
||||
|
|
|
@ -132,17 +132,6 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
|
|||
return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes))
|
||||
}
|
||||
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) deleteSegmentMeta(segID UniqueID) error {
|
||||
_, ok := mt.segID2Meta[segID]
|
||||
|
||||
if ok {
|
||||
delete(mt.segID2Meta, segID)
|
||||
}
|
||||
|
||||
return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
|
||||
}
|
||||
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error {
|
||||
segIDStrs := make([]string, 0, len(segIDs))
|
||||
|
|
|
@ -73,7 +73,7 @@ func TestMetaTable_Collection(t *testing.T) {
|
|||
Name: "coll2",
|
||||
},
|
||||
CreateTime: 0,
|
||||
SegmentIDs: []UniqueID{1},
|
||||
SegmentIDs: []UniqueID{},
|
||||
PartitionTags: []string{"1"},
|
||||
}
|
||||
segID1 := pb.SegmentMeta{
|
||||
|
@ -121,11 +121,16 @@ func TestMetaTable_Collection(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
err = meta.AddSegment(&segID3)
|
||||
assert.Nil(t, err)
|
||||
getColMeta, err := meta.GetCollectionByName(colMeta.Schema.Name)
|
||||
getColMeta, err := meta.GetCollectionByName("coll5")
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, getColMeta)
|
||||
getColMeta, err = meta.GetCollectionByName(colMeta.Schema.Name)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 3, len(getColMeta.SegmentIDs))
|
||||
err = meta.DeleteCollection(colMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
err = meta.DeleteCollection(500)
|
||||
assert.NotNil(t, err)
|
||||
hasCollection = meta.HasCollection(colMeta.ID)
|
||||
assert.False(t, hasCollection)
|
||||
_, err = meta.GetSegmentByID(segID1.SegmentID)
|
||||
|
@ -193,10 +198,14 @@ func TestMetaTable_DeletePartition(t *testing.T) {
|
|||
}
|
||||
err = meta.AddCollection(&colMeta)
|
||||
assert.Nil(t, err)
|
||||
err = meta.AddPartition(500, "p1")
|
||||
assert.NotNil(t, err)
|
||||
err = meta.AddPartition(colMeta.ID, "p1")
|
||||
assert.Nil(t, err)
|
||||
err = meta.AddPartition(colMeta.ID, "p2")
|
||||
assert.Nil(t, err)
|
||||
err = meta.AddPartition(colMeta.ID, "p2")
|
||||
assert.NotNil(t, err)
|
||||
err = meta.AddSegment(&segID1)
|
||||
assert.Nil(t, err)
|
||||
err = meta.AddSegment(&segID2)
|
||||
|
@ -209,6 +218,8 @@ func TestMetaTable_DeletePartition(t *testing.T) {
|
|||
assert.Equal(t, 3, len(afterCollMeta.SegmentIDs))
|
||||
err = meta.DeletePartition(100, "p1")
|
||||
assert.Nil(t, err)
|
||||
err = meta.DeletePartition(500, "p1")
|
||||
assert.NotNil(t, err)
|
||||
afterCollMeta, err = meta.GetCollectionByName("coll1")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 1, len(afterCollMeta.PartitionTags))
|
||||
|
@ -285,12 +296,16 @@ func TestMetaTable_Segment(t *testing.T) {
|
|||
assert.Equal(t, &segMeta, getSegMeta)
|
||||
err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111)
|
||||
assert.Nil(t, err)
|
||||
err = meta.CloseSegment(1000, Timestamp(11), 111)
|
||||
assert.NotNil(t, err)
|
||||
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, getSegMeta.NumRows, int64(111))
|
||||
assert.Equal(t, getSegMeta.CloseTime, uint64(11))
|
||||
err = meta.DeleteSegment(segMeta.SegmentID)
|
||||
assert.Nil(t, err)
|
||||
err = meta.DeleteSegment(1000)
|
||||
assert.NotNil(t, err)
|
||||
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
|
||||
assert.Nil(t, getSegMeta)
|
||||
assert.NotNil(t, err)
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
package master
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
func TestMaster_CreateCollectionTask(t *testing.T) {
|
||||
req := internalpb.CreateCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kCreateCollection,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
Schema: nil,
|
||||
}
|
||||
var collectionTask task = &createCollectionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kCreateCollection, collectionTask.Type())
|
||||
ts, err := collectionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collectionTask = &createCollectionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
|
||||
ts, err = collectionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = collectionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMaster_DropCollectionTask(t *testing.T) {
|
||||
req := internalpb.DropCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDropPartition,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
CollectionName: nil,
|
||||
}
|
||||
var collectionTask task = &dropCollectionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kDropPartition, collectionTask.Type())
|
||||
ts, err := collectionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collectionTask = &dropCollectionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
|
||||
ts, err = collectionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = collectionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMaster_HasCollectionTask(t *testing.T) {
|
||||
req := internalpb.HasCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kHasCollection,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
CollectionName: nil,
|
||||
}
|
||||
var collectionTask task = &hasCollectionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kHasCollection, collectionTask.Type())
|
||||
ts, err := collectionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collectionTask = &hasCollectionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
|
||||
ts, err = collectionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = collectionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMaster_ShowCollectionTask(t *testing.T) {
|
||||
req := internalpb.ShowCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kShowCollections,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
}
|
||||
var collectionTask task = &showCollectionsTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kShowCollections, collectionTask.Type())
|
||||
ts, err := collectionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collectionTask = &showCollectionsTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
|
||||
ts, err = collectionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = collectionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMaster_DescribeCollectionTask(t *testing.T) {
|
||||
req := internalpb.DescribeCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
CollectionName: nil,
|
||||
}
|
||||
var collectionTask task = &describeCollectionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kDescribeCollection, collectionTask.Type())
|
||||
ts, err := collectionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collectionTask = &describeCollectionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())
|
||||
ts, err = collectionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = collectionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMaster_CreatePartitionTask(t *testing.T) {
|
||||
req := internalpb.CreatePartitionRequest{
|
||||
MsgType: internalpb.MsgType_kCreatePartition,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
PartitionName: nil,
|
||||
}
|
||||
var partitionTask task = &createPartitionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kCreatePartition, partitionTask.Type())
|
||||
ts, err := partitionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
partitionTask = &createPartitionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
|
||||
ts, err = partitionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = partitionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
func TestMaster_DropPartitionTask(t *testing.T) {
|
||||
req := internalpb.DropPartitionRequest{
|
||||
MsgType: internalpb.MsgType_kDropPartition,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
PartitionName: nil,
|
||||
}
|
||||
var partitionTask task = &dropPartitionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kDropPartition, partitionTask.Type())
|
||||
ts, err := partitionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
partitionTask = &dropPartitionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
|
||||
ts, err = partitionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = partitionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
func TestMaster_HasPartitionTask(t *testing.T) {
|
||||
req := internalpb.HasPartitionRequest{
|
||||
MsgType: internalpb.MsgType_kHasPartition,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
PartitionName: nil,
|
||||
}
|
||||
var partitionTask task = &hasPartitionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kHasPartition, partitionTask.Type())
|
||||
ts, err := partitionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
partitionTask = &hasPartitionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
|
||||
ts, err = partitionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = partitionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
func TestMaster_DescribePartitionTask(t *testing.T) {
|
||||
req := internalpb.DescribePartitionRequest{
|
||||
MsgType: internalpb.MsgType_kDescribePartition,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
PartitionName: nil,
|
||||
}
|
||||
var partitionTask task = &describePartitionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kDescribePartition, partitionTask.Type())
|
||||
ts, err := partitionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
partitionTask = &describePartitionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
|
||||
ts, err = partitionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = partitionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
func TestMaster_ShowPartitionTask(t *testing.T) {
|
||||
req := internalpb.ShowPartitionRequest{
|
||||
MsgType: internalpb.MsgType_kShowPartitions,
|
||||
ReqID: 1,
|
||||
Timestamp: 11,
|
||||
ProxyID: 1,
|
||||
}
|
||||
var partitionTask task = &showPartitionTask{
|
||||
req: &req,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
assert.Equal(t, internalpb.MsgType_kShowPartitions, partitionTask.Type())
|
||||
ts, err := partitionTask.Ts()
|
||||
assert.Equal(t, uint64(11), ts)
|
||||
assert.Nil(t, err)
|
||||
|
||||
partitionTask = &showPartitionTask{
|
||||
req: nil,
|
||||
baseTask: baseTask{},
|
||||
}
|
||||
|
||||
assert.Equal(t, internalpb.MsgType_kNone, partitionTask.Type())
|
||||
ts, err = partitionTask.Ts()
|
||||
assert.Equal(t, uint64(0), ts)
|
||||
assert.NotNil(t, err)
|
||||
err = partitionTask.Execute()
|
||||
assert.NotNil(t, err)
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
)
|
||||
|
||||
type Condition interface {
|
||||
WaitToFinish() error
|
||||
Notify(err error)
|
||||
}
|
||||
|
||||
type TaskCondition struct {
|
||||
done chan error
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (tc *TaskCondition) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case <-tc.ctx.Done():
|
||||
return errors.New("timeout")
|
||||
case err := <-tc.done:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *TaskCondition) Notify(err error) {
|
||||
tc.done <- err
|
||||
}
|
||||
|
||||
func NewTaskCondition(ctx context.Context) *TaskCondition {
|
||||
return &TaskCondition{
|
||||
done: make(chan error),
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
|
@ -14,8 +15,13 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||
)
|
||||
|
||||
const (
|
||||
reqTimeoutInterval = time.Second * 2
|
||||
)
|
||||
|
||||
func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) {
|
||||
it := &InsertTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
BaseInsertTask: BaseInsertTask{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: in.HashKeys,
|
||||
|
@ -27,14 +33,14 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
|
|||
RowData: in.RowData,
|
||||
},
|
||||
},
|
||||
done: make(chan error),
|
||||
manipulationMsgStream: p.manipulationMsgStream,
|
||||
}
|
||||
|
||||
it.ctx, it.cancel = context.WithCancel(ctx)
|
||||
var cancel func()
|
||||
it.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
// TODO: req_id, segment_id, channel_id, proxy_id, timestamps, row_ids
|
||||
|
||||
defer it.cancel()
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -70,18 +76,19 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
|
|||
|
||||
func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSchema) (*commonpb.Status, error) {
|
||||
cct := &CreateCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
CreateCollectionRequest: internalpb.CreateCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kCreateCollection,
|
||||
Schema: &commonpb.Blob{},
|
||||
// TODO: req_id, timestamp, proxy_id
|
||||
},
|
||||
masterClient: p.masterClient,
|
||||
done: make(chan error),
|
||||
}
|
||||
schemaBytes, _ := proto.Marshal(req)
|
||||
cct.CreateCollectionRequest.Schema.Value = schemaBytes
|
||||
cct.ctx, cct.cancel = context.WithCancel(ctx)
|
||||
defer cct.cancel()
|
||||
var cancel func()
|
||||
cct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -112,23 +119,24 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc
|
|||
|
||||
func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.QueryResult, error) {
|
||||
qt := &QueryTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
SearchRequest: internalpb.SearchRequest{
|
||||
MsgType: internalpb.MsgType_kSearch,
|
||||
Query: &commonpb.Blob{},
|
||||
// TODO: req_id, proxy_id, timestamp, result_channel_id
|
||||
},
|
||||
queryMsgStream: p.queryMsgStream,
|
||||
done: make(chan error),
|
||||
resultBuf: make(chan []*internalpb.SearchResult),
|
||||
}
|
||||
qt.ctx, qt.cancel = context.WithCancel(ctx)
|
||||
var cancel func()
|
||||
qt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
// Hack with test, shit here but no other ways
|
||||
reqID, _ := strconv.Atoi(req.CollectionName[len(req.CollectionName)-1:])
|
||||
qt.ReqID = int64(reqID)
|
||||
queryBytes, _ := proto.Marshal(req)
|
||||
qt.SearchRequest.Query.Value = queryBytes
|
||||
log.Printf("grpc address of query task: %p", qt)
|
||||
defer qt.cancel()
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -163,16 +171,17 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
|
|||
|
||||
func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionName) (*commonpb.Status, error) {
|
||||
dct := &DropCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
DropCollectionRequest: internalpb.DropCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDropCollection,
|
||||
// TODO: req_id, timestamp, proxy_id
|
||||
CollectionName: req,
|
||||
},
|
||||
masterClient: p.masterClient,
|
||||
done: make(chan error),
|
||||
}
|
||||
dct.ctx, dct.cancel = context.WithCancel(ctx)
|
||||
defer dct.cancel()
|
||||
var cancel func()
|
||||
dct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -203,16 +212,17 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam
|
|||
|
||||
func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.BoolResponse, error) {
|
||||
hct := &HasCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
HasCollectionRequest: internalpb.HasCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kHasCollection,
|
||||
// TODO: req_id, timestamp, proxy_id
|
||||
CollectionName: req,
|
||||
},
|
||||
masterClient: p.masterClient,
|
||||
done: make(chan error),
|
||||
}
|
||||
hct.ctx, hct.cancel = context.WithCancel(ctx)
|
||||
defer hct.cancel()
|
||||
var cancel func()
|
||||
hct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -247,16 +257,17 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName
|
|||
|
||||
func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.CollectionDescription, error) {
|
||||
dct := &DescribeCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
DescribeCollectionRequest: internalpb.DescribeCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
// TODO: req_id, timestamp, proxy_id
|
||||
CollectionName: req,
|
||||
},
|
||||
masterClient: p.masterClient,
|
||||
done: make(chan error),
|
||||
}
|
||||
dct.ctx, dct.cancel = context.WithCancel(ctx)
|
||||
defer dct.cancel()
|
||||
var cancel func()
|
||||
dct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
@ -291,15 +302,16 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio
|
|||
|
||||
func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*servicepb.StringListResponse, error) {
|
||||
sct := &ShowCollectionsTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
ShowCollectionRequest: internalpb.ShowCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
// TODO: req_id, timestamp, proxy_id
|
||||
},
|
||||
masterClient: p.masterClient,
|
||||
done: make(chan error),
|
||||
}
|
||||
sct.ctx, sct.cancel = context.WithCancel(ctx)
|
||||
defer sct.cancel()
|
||||
var cancel func()
|
||||
sct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
|
|
|
@ -36,11 +36,12 @@ var masterServer *master.Master
|
|||
var testNum = 10
|
||||
|
||||
func startMaster(ctx context.Context) {
|
||||
etcdAddr, err := Params.EtcdAddress()
|
||||
master.Init()
|
||||
etcdAddr, err := masterParam.Params.EtcdAddress()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rootPath, err := Params.EtcdRootPath()
|
||||
rootPath, err := masterParam.Params.EtcdRootPath()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -52,7 +53,6 @@ func startMaster(ctx context.Context) {
|
|||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
}
|
||||
masterParam.Params.InitParamTable()
|
||||
if err := svr.Run(int64(masterParam.Params.Port())); err != nil {
|
||||
log.Fatal("run server failed", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -29,12 +29,11 @@ type BaseInsertTask = msgstream.InsertMsg
|
|||
|
||||
type InsertTask struct {
|
||||
BaseInsertTask
|
||||
Condition
|
||||
ts Timestamp
|
||||
done chan error
|
||||
result *servicepb.IntegerRangeResponse
|
||||
manipulationMsgStream *msgstream.PulsarMsgStream
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (it *InsertTask) SetTs(ts Timestamp) {
|
||||
|
@ -86,29 +85,12 @@ func (it *InsertTask) PostExecute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-it.done:
|
||||
return err
|
||||
case <-it.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (it *InsertTask) Notify(err error) {
|
||||
it.done <- err
|
||||
}
|
||||
|
||||
type CreateCollectionTask struct {
|
||||
Condition
|
||||
internalpb.CreateCollectionRequest
|
||||
masterClient masterpb.MasterClient
|
||||
done chan error
|
||||
result *commonpb.Status
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (cct *CreateCollectionTask) ID() UniqueID {
|
||||
|
@ -153,29 +135,12 @@ func (cct *CreateCollectionTask) PostExecute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cct *CreateCollectionTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-cct.done:
|
||||
return err
|
||||
case <-cct.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cct *CreateCollectionTask) Notify(err error) {
|
||||
cct.done <- err
|
||||
}
|
||||
|
||||
type DropCollectionTask struct {
|
||||
Condition
|
||||
internalpb.DropCollectionRequest
|
||||
masterClient masterpb.MasterClient
|
||||
done chan error
|
||||
result *commonpb.Status
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (dct *DropCollectionTask) ID() UniqueID {
|
||||
|
@ -220,30 +185,13 @@ func (dct *DropCollectionTask) PostExecute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dct *DropCollectionTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-dct.done:
|
||||
return err
|
||||
case <-dct.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dct *DropCollectionTask) Notify(err error) {
|
||||
dct.done <- err
|
||||
}
|
||||
|
||||
type QueryTask struct {
|
||||
Condition
|
||||
internalpb.SearchRequest
|
||||
queryMsgStream *msgstream.PulsarMsgStream
|
||||
done chan error
|
||||
resultBuf chan []*internalpb.SearchResult
|
||||
result *servicepb.QueryResult
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (qt *QueryTask) ID() UniqueID {
|
||||
|
@ -345,29 +293,12 @@ func (qt *QueryTask) PostExecute() error {
|
|||
//return nil
|
||||
}
|
||||
|
||||
func (qt *QueryTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-qt.done:
|
||||
return err
|
||||
case <-qt.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (qt *QueryTask) Notify(err error) {
|
||||
qt.done <- err
|
||||
}
|
||||
|
||||
type HasCollectionTask struct {
|
||||
Condition
|
||||
internalpb.HasCollectionRequest
|
||||
masterClient masterpb.MasterClient
|
||||
done chan error
|
||||
result *servicepb.BoolResponse
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (hct *HasCollectionTask) ID() UniqueID {
|
||||
|
@ -415,29 +346,12 @@ func (hct *HasCollectionTask) PostExecute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (hct *HasCollectionTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-hct.done:
|
||||
return err
|
||||
case <-hct.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (hct *HasCollectionTask) Notify(err error) {
|
||||
hct.done <- err
|
||||
}
|
||||
|
||||
type DescribeCollectionTask struct {
|
||||
Condition
|
||||
internalpb.DescribeCollectionRequest
|
||||
masterClient masterpb.MasterClient
|
||||
done chan error
|
||||
result *servicepb.CollectionDescription
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (dct *DescribeCollectionTask) ID() UniqueID {
|
||||
|
@ -484,29 +398,12 @@ func (dct *DescribeCollectionTask) PostExecute() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dct *DescribeCollectionTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-dct.done:
|
||||
return err
|
||||
case <-dct.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dct *DescribeCollectionTask) Notify(err error) {
|
||||
dct.done <- err
|
||||
}
|
||||
|
||||
type ShowCollectionsTask struct {
|
||||
Condition
|
||||
internalpb.ShowCollectionRequest
|
||||
masterClient masterpb.MasterClient
|
||||
done chan error
|
||||
result *servicepb.StringListResponse
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (sct *ShowCollectionsTask) ID() UniqueID {
|
||||
|
@ -552,19 +449,3 @@ func (sct *ShowCollectionsTask) Execute() error {
|
|||
func (sct *ShowCollectionsTask) PostExecute() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sct *ShowCollectionsTask) WaitToFinish() error {
|
||||
for {
|
||||
select {
|
||||
case err := <-sct.done:
|
||||
return err
|
||||
case <-sct.ctx.Done():
|
||||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sct *ShowCollectionsTask) Notify(err error) {
|
||||
sct.done <- err
|
||||
}
|
||||
|
|
|
@ -20,7 +20,19 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
)
|
||||
|
||||
type container interface {
|
||||
/*
|
||||
* collectionReplica contains a in-memory local copy of persistent collections.
|
||||
* In common cases, the system has multiple query nodes. Data of a collection will be
|
||||
* distributed across all the available query nodes, and each query node's collectionReplica
|
||||
* will maintain its own share (only part of the collection).
|
||||
* Every replica tracks a value called tSafe which is the maximum timestamp that the replica
|
||||
* is up-to-date.
|
||||
*/
|
||||
type collectionReplica interface {
|
||||
// tSafe
|
||||
getTSafe() Timestamp
|
||||
setTSafe(t Timestamp)
|
||||
|
||||
// collection
|
||||
getCollectionNum() int
|
||||
addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
|
||||
|
@ -44,36 +56,51 @@ type container interface {
|
|||
hasSegment(segmentID UniqueID) bool
|
||||
}
|
||||
|
||||
// TODO: rename
|
||||
type colSegContainer struct {
|
||||
type collectionReplicaImpl struct {
|
||||
tSafeMu sync.Mutex
|
||||
tSafe Timestamp
|
||||
|
||||
mu sync.RWMutex
|
||||
collections []*Collection
|
||||
segments map[UniqueID]*Segment
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- collection
|
||||
func (container *colSegContainer) getCollectionNum() int {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
|
||||
return len(container.collections)
|
||||
//----------------------------------------------------------------------------------------------------- tSafe
|
||||
func (colReplica *collectionReplicaImpl) getTSafe() Timestamp {
|
||||
colReplica.tSafeMu.Lock()
|
||||
defer colReplica.tSafeMu.Unlock()
|
||||
return colReplica.tSafe
|
||||
}
|
||||
|
||||
func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
func (colReplica *collectionReplicaImpl) setTSafe(t Timestamp) {
|
||||
colReplica.tSafeMu.Lock()
|
||||
colReplica.tSafe = t
|
||||
colReplica.tSafeMu.Unlock()
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- collection
|
||||
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
return len(colReplica.collections)
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplicaImpl) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var newCollection = newCollection(collMeta, colMetaBlob)
|
||||
container.collections = append(container.collections, newCollection)
|
||||
colReplica.collections = append(colReplica.collections, newCollection)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) removeCollection(collectionID UniqueID) error {
|
||||
collection, err := container.getCollectionByID(collectionID)
|
||||
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
|
||||
collection, err := colReplica.getCollectionByID(collectionID)
|
||||
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -82,11 +109,11 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error
|
|||
deleteCollection(collection)
|
||||
|
||||
tmpCollections := make([]*Collection, 0)
|
||||
for _, col := range container.collections {
|
||||
for _, col := range colReplica.collections {
|
||||
if col.ID() == collectionID {
|
||||
for _, p := range *col.Partitions() {
|
||||
for _, s := range *p.Segments() {
|
||||
delete(container.segments, s.ID())
|
||||
delete(colReplica.segments, s.ID())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -94,15 +121,15 @@ func (container *colSegContainer) removeCollection(collectionID UniqueID) error
|
|||
}
|
||||
}
|
||||
|
||||
container.collections = tmpCollections
|
||||
colReplica.collections = tmpCollections
|
||||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Collection, error) {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
for _, collection := range container.collections {
|
||||
for _, collection := range colReplica.collections {
|
||||
if collection.ID() == collectionID {
|
||||
return collection, nil
|
||||
}
|
||||
|
@ -111,11 +138,11 @@ func (container *colSegContainer) getCollectionByID(collectionID UniqueID) (*Col
|
|||
return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
|
||||
}
|
||||
|
||||
func (container *colSegContainer) getCollectionByName(collectionName string) (*Collection, error) {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
for _, collection := range container.collections {
|
||||
for _, collection := range colReplica.collections {
|
||||
if collection.Name() == collectionName {
|
||||
return collection, nil
|
||||
}
|
||||
|
@ -125,14 +152,14 @@ func (container *colSegContainer) getCollectionByName(collectionName string) (*C
|
|||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- partition
|
||||
func (container *colSegContainer) addPartition(collectionID UniqueID, partitionTag string) error {
|
||||
collection, err := container.getCollectionByID(collectionID)
|
||||
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error {
|
||||
collection, err := colReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var newPartition = newPartition(partitionTag)
|
||||
|
||||
|
@ -140,20 +167,20 @@ func (container *colSegContainer) addPartition(collectionID UniqueID, partitionT
|
|||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) removePartition(collectionID UniqueID, partitionTag string) error {
|
||||
collection, err := container.getCollectionByID(collectionID)
|
||||
func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionTag string) error {
|
||||
collection, err := colReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var tmpPartitions = make([]*Partition, 0)
|
||||
for _, p := range *collection.Partitions() {
|
||||
if p.Tag() == partitionTag {
|
||||
for _, s := range *p.Segments() {
|
||||
delete(container.segments, s.ID())
|
||||
delete(colReplica.segments, s.ID())
|
||||
}
|
||||
} else {
|
||||
tmpPartitions = append(tmpPartitions, p)
|
||||
|
@ -164,14 +191,14 @@ func (container *colSegContainer) removePartition(collectionID UniqueID, partiti
|
|||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
|
||||
collection, err := container.getCollectionByID(collectionID)
|
||||
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
|
||||
collection, err := colReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
for _, p := range *collection.Partitions() {
|
||||
if p.Tag() == partitionTag {
|
||||
|
@ -183,17 +210,17 @@ func (container *colSegContainer) getPartitionByTag(collectionID UniqueID, parti
|
|||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- segment
|
||||
func (container *colSegContainer) getSegmentNum() int {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
return len(container.segments)
|
||||
return len(colReplica.segments)
|
||||
}
|
||||
|
||||
func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSegStats {
|
||||
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats {
|
||||
var statisticData = make([]*internalpb.SegmentStats, 0)
|
||||
|
||||
for segmentID, segment := range container.segments {
|
||||
for segmentID, segment := range colReplica.segments {
|
||||
currentMemSize := segment.getMemSize()
|
||||
segment.lastMemSize = currentMemSize
|
||||
segmentNumOfRows := segment.getRowCount()
|
||||
|
@ -215,36 +242,36 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe
|
|||
}
|
||||
}
|
||||
|
||||
func (container *colSegContainer) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
|
||||
collection, err := container.getCollectionByID(collectionID)
|
||||
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionTag string, collectionID UniqueID) error {
|
||||
collection, err := colReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partition, err := container.getPartitionByTag(collectionID, partitionTag)
|
||||
partition, err := colReplica.getPartitionByTag(collectionID, partitionTag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var newSegment = newSegment(collection, segmentID)
|
||||
|
||||
container.segments[segmentID] = newSegment
|
||||
colReplica.segments[segmentID] = newSegment
|
||||
*partition.Segments() = append(*partition.Segments(), newSegment)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
|
||||
container.mu.Lock()
|
||||
defer container.mu.Unlock()
|
||||
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
var targetPartition *Partition
|
||||
var segmentIndex = -1
|
||||
|
||||
for _, col := range container.collections {
|
||||
for _, col := range colReplica.collections {
|
||||
for _, p := range *col.Partitions() {
|
||||
for i, s := range *p.Segments() {
|
||||
if s.ID() == segmentID {
|
||||
|
@ -255,7 +282,7 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
|
|||
}
|
||||
}
|
||||
|
||||
delete(container.segments, segmentID)
|
||||
delete(colReplica.segments, segmentID)
|
||||
|
||||
if targetPartition != nil && segmentIndex > 0 {
|
||||
targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...)
|
||||
|
@ -264,11 +291,11 @@ func (container *colSegContainer) removeSegment(segmentID UniqueID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment, error) {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
targetSegment, ok := container.segments[segmentID]
|
||||
targetSegment, ok := colReplica.segments[segmentID]
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
|
||||
|
@ -277,11 +304,11 @@ func (container *colSegContainer) getSegmentByID(segmentID UniqueID) (*Segment,
|
|||
return targetSegment, nil
|
||||
}
|
||||
|
||||
func (container *colSegContainer) hasSegment(segmentID UniqueID) bool {
|
||||
container.mu.RLock()
|
||||
defer container.mu.RUnlock()
|
||||
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
_, ok := container.segments[segmentID]
|
||||
_, ok := colReplica.segments[segmentID]
|
||||
|
||||
return ok
|
||||
}
|
|
@ -15,8 +15,7 @@ import (
|
|||
//----------------------------------------------------------------------------------------------------- collection
|
||||
func TestColSegContainer_addCollection(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -59,20 +58,19 @@ func TestColSegContainer_addCollection(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
}
|
||||
|
||||
func TestColSegContainer_removeCollection(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -116,25 +114,24 @@ func TestColSegContainer_removeCollection(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).removeCollection(collectionID)
|
||||
err = (*node.replica).removeCollection(collectionID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 0)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 0)
|
||||
}
|
||||
|
||||
func TestColSegContainer_getCollectionByID(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -177,17 +174,17 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
targetCollection, err := (*node.container).getCollectionByID(UniqueID(0))
|
||||
targetCollection, err := (*node.replica).getCollectionByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, targetCollection)
|
||||
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
|
||||
|
@ -196,8 +193,7 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
|
|||
|
||||
func TestColSegContainer_getCollectionByName(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -240,17 +236,17 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
targetCollection, err := (*node.container).getCollectionByName("collection0")
|
||||
targetCollection, err := (*node.replica).getCollectionByName("collection0")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, targetCollection)
|
||||
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
|
||||
|
@ -260,8 +256,7 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
|
|||
//----------------------------------------------------------------------------------------------------- partition
|
||||
func TestColSegContainer_addPartition(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -305,20 +300,20 @@ func TestColSegContainer_addPartition(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, collectionID)
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
for _, tag := range collectionMeta.PartitionTags {
|
||||
err := (*node.container).addPartition(collectionID, tag)
|
||||
err := (*node.replica).addPartition(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
|
||||
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, partition.partitionTag, "default")
|
||||
}
|
||||
|
@ -326,8 +321,7 @@ func TestColSegContainer_addPartition(t *testing.T) {
|
|||
|
||||
func TestColSegContainer_removePartition(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -372,31 +366,30 @@ func TestColSegContainer_removePartition(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, collectionID)
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
for _, tag := range collectionMeta.PartitionTags {
|
||||
err := (*node.container).addPartition(collectionID, tag)
|
||||
err := (*node.replica).addPartition(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
|
||||
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, partition.partitionTag, partitionTag)
|
||||
err = (*node.container).removePartition(collectionID, partitionTag)
|
||||
err = (*node.replica).removePartition(collectionID, partitionTag)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestColSegContainer_getPartitionByTag(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -440,20 +433,20 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, collectionID)
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
for _, tag := range collectionMeta.PartitionTags {
|
||||
err := (*node.container).addPartition(collectionID, tag)
|
||||
err := (*node.replica).addPartition(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
partition, err := (*node.container).getPartitionByTag(collectionID, tag)
|
||||
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, partition.partitionTag, "default")
|
||||
assert.NotNil(t, partition)
|
||||
|
@ -463,8 +456,7 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
|
|||
//----------------------------------------------------------------------------------------------------- segment
|
||||
func TestColSegContainer_addSegment(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -508,24 +500,24 @@ func TestColSegContainer_addSegment(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
const segmentNum = 3
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
assert.NoError(t, err)
|
||||
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
|
||||
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
|
||||
}
|
||||
|
@ -533,8 +525,7 @@ func TestColSegContainer_addSegment(t *testing.T) {
|
|||
|
||||
func TestColSegContainer_removeSegment(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -578,35 +569,34 @@ func TestColSegContainer_removeSegment(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
const segmentNum = 3
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
assert.NoError(t, err)
|
||||
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
|
||||
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
|
||||
err = (*node.container).removeSegment(UniqueID(i))
|
||||
err = (*node.replica).removeSegment(UniqueID(i))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestColSegContainer_getSegmentByID(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -650,24 +640,24 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
const segmentNum = 3
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
assert.NoError(t, err)
|
||||
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
|
||||
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
|
||||
}
|
||||
|
@ -675,8 +665,7 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
|
|||
|
||||
func TestColSegContainer_hasSegment(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
collectionID := UniqueID(0)
|
||||
|
@ -720,29 +709,29 @@ func TestColSegContainer_hasSegment(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, collectionName)
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
const segmentNum = 3
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
err := (*node.container).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
|
||||
assert.NoError(t, err)
|
||||
targetSeg, err := (*node.container).getSegmentByID(UniqueID(i))
|
||||
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
|
||||
hasSeg := (*node.container).hasSegment(UniqueID(i))
|
||||
hasSeg := (*node.replica).hasSegment(UniqueID(i))
|
||||
assert.Equal(t, hasSeg, true)
|
||||
hasSeg = (*node.container).hasSegment(UniqueID(i + 100))
|
||||
hasSeg = (*node.replica).hasSegment(UniqueID(i + 100))
|
||||
assert.Equal(t, hasSeg, false)
|
||||
}
|
||||
}
|
|
@ -13,8 +13,7 @@ import (
|
|||
|
||||
func TestCollection_Partitions(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -57,18 +56,18 @@ func TestCollection_Partitions(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
for _, tag := range collectionMeta.PartitionTags {
|
||||
err := (*node.container).addPartition(collection.ID(), tag)
|
||||
err := (*node.replica).addPartition(collection.ID(), tag)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,33 +4,23 @@ import (
|
|||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
type dataSyncService struct {
|
||||
ctx context.Context
|
||||
pulsarURL string
|
||||
fg *flowgraph.TimeTickedFlowGraph
|
||||
ctx context.Context
|
||||
fg *flowgraph.TimeTickedFlowGraph
|
||||
|
||||
// input streams
|
||||
dmStream *msgstream.MsgStream
|
||||
// ddStream *msgstream.MsgStream
|
||||
// k2sStream *msgstream.MsgStream
|
||||
|
||||
node *QueryNode
|
||||
replica *collectionReplica
|
||||
}
|
||||
|
||||
func newDataSyncService(ctx context.Context, node *QueryNode, pulsarURL string) *dataSyncService {
|
||||
func newDataSyncService(ctx context.Context, replica *collectionReplica) *dataSyncService {
|
||||
|
||||
return &dataSyncService{
|
||||
ctx: ctx,
|
||||
pulsarURL: pulsarURL,
|
||||
fg: nil,
|
||||
ctx: ctx,
|
||||
fg: nil,
|
||||
|
||||
dmStream: nil,
|
||||
|
||||
node: node,
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,7 +31,6 @@ func (dsService *dataSyncService) start() {
|
|||
|
||||
func (dsService *dataSyncService) close() {
|
||||
dsService.fg.Close()
|
||||
(*dsService.dmStream).Close()
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) initNodes() {
|
||||
|
@ -49,10 +38,10 @@ func (dsService *dataSyncService) initNodes() {
|
|||
|
||||
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
|
||||
|
||||
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.pulsarURL)
|
||||
var dmStreamNode Node = newDmInputNode(dsService.ctx)
|
||||
var filterDmNode Node = newFilteredDmNode()
|
||||
var insertNode Node = newInsertNode(dsService.node.container)
|
||||
var serviceTimeNode Node = newServiceTimeNode(dsService.node)
|
||||
var insertNode Node = newInsertNode(dsService.replica)
|
||||
var serviceTimeNode Node = newServiceTimeNode(dsService.replica)
|
||||
|
||||
dsService.fg.AddNode(&dmStreamNode)
|
||||
dsService.fg.AddNode(&filterDmNode)
|
||||
|
@ -90,21 +79,4 @@ func (dsService *dataSyncService) initNodes() {
|
|||
if err != nil {
|
||||
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
|
||||
}
|
||||
|
||||
dsService.setDmStream(&dmStreamNode)
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) setDmStream(node *Node) {
|
||||
if (*node).IsInputNode() {
|
||||
inStream, ok := (*node).(*InputNode)
|
||||
dsService.dmStream = inStream.InStream()
|
||||
if !ok {
|
||||
log.Fatal("Invalid inputNode")
|
||||
}
|
||||
} else {
|
||||
log.Fatal("stream set failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) setDdStream(node *Node) {}
|
||||
func (dsService *dataSyncService) setK2sStream(node *Node) {}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
// NOTE: start pulsar before test
|
||||
func TestManipulationService_Start(t *testing.T) {
|
||||
Params.Init()
|
||||
var ctx context.Context
|
||||
|
||||
if closeWithDeadline {
|
||||
|
@ -32,7 +33,7 @@ func TestManipulationService_Start(t *testing.T) {
|
|||
|
||||
// init query node
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
// init meta
|
||||
collectionName := "collection0"
|
||||
|
@ -76,20 +77,20 @@ func TestManipulationService_Start(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// test data generate
|
||||
|
@ -168,7 +169,7 @@ func TestManipulationService_Start(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// dataSync
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
|
||||
go node.dataSyncService.start()
|
||||
|
||||
node.Close()
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
type insertNode struct {
|
||||
BaseNode
|
||||
container *container
|
||||
replica *collectionReplica
|
||||
}
|
||||
|
||||
type InsertData struct {
|
||||
|
@ -58,13 +58,13 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
|
|||
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
|
||||
|
||||
// check if segment exists, if not, create this segment
|
||||
if !(*iNode.container).hasSegment(task.SegmentID) {
|
||||
collection, err := (*iNode.container).getCollectionByName(task.CollectionName)
|
||||
if !(*iNode.replica).hasSegment(task.SegmentID) {
|
||||
collection, err := (*iNode.replica).getCollectionByName(task.CollectionName)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
|
||||
err = (*iNode.replica).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
|
@ -74,7 +74,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
|
|||
|
||||
// 2. do preInsert
|
||||
for segmentID := range insertData.insertRecords {
|
||||
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
|
||||
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
log.Println("preInsert failed")
|
||||
// TODO: add error handling
|
||||
|
@ -102,7 +102,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
|
|||
}
|
||||
|
||||
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
|
||||
var targetSegment, err = (*iNode.container).getSegmentByID(segmentID)
|
||||
var targetSegment, err = (*iNode.replica).getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
log.Println("cannot find segment:", segmentID)
|
||||
// TODO: add error handling
|
||||
|
@ -125,13 +125,13 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
|
|||
wg.Done()
|
||||
}
|
||||
|
||||
func newInsertNode(container *container) *insertNode {
|
||||
func newInsertNode(replica *collectionReplica) *insertNode {
|
||||
baseNode := BaseNode{}
|
||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &insertNode{
|
||||
BaseNode: baseNode,
|
||||
container: container,
|
||||
BaseNode: baseNode,
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,22 +2,28 @@ package reader
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
func newDmInputNode(ctx context.Context, pulsarURL string) *flowgraph.InputNode {
|
||||
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
|
||||
const (
|
||||
receiveBufSize = 1024
|
||||
pulsarBufSize = 1024
|
||||
)
|
||||
|
||||
msgStreamURL, err := Params.PulsarAddress()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
consumeChannels := []string{"insert"}
|
||||
consumeSubName := "insertSub"
|
||||
|
||||
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
insertStream.SetPulsarCient(pulsarURL)
|
||||
insertStream.SetPulsarCient(msgStreamURL)
|
||||
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
||||
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
type serviceTimeNode struct {
|
||||
BaseNode
|
||||
node *QueryNode
|
||||
replica *collectionReplica
|
||||
}
|
||||
|
||||
func (stNode *serviceTimeNode) Name() string {
|
||||
|
@ -28,17 +28,17 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
|
|||
}
|
||||
|
||||
// update service time
|
||||
stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax)
|
||||
(*stNode.replica).setTSafe(serviceTimeMsg.timeRange.timestampMax)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newServiceTimeNode(node *QueryNode) *serviceTimeNode {
|
||||
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
|
||||
baseNode := BaseNode{}
|
||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &serviceTimeNode{
|
||||
BaseNode: baseNode,
|
||||
node: node,
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,12 +24,12 @@ const (
|
|||
)
|
||||
|
||||
type metaService struct {
|
||||
ctx context.Context
|
||||
kvBase *kv.EtcdKV
|
||||
container *container
|
||||
ctx context.Context
|
||||
kvBase *kv.EtcdKV
|
||||
replica *collectionReplica
|
||||
}
|
||||
|
||||
func newMetaService(ctx context.Context, container *container) *metaService {
|
||||
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService {
|
||||
ETCDAddr, err := Params.EtcdAddress()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -46,9 +46,9 @@ func newMetaService(ctx context.Context, container *container) *metaService {
|
|||
})
|
||||
|
||||
return &metaService{
|
||||
ctx: ctx,
|
||||
kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
|
||||
container: container,
|
||||
ctx: ctx,
|
||||
kvBase: kv.NewEtcdKV(cli, ETCDRootPath),
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,12 +164,12 @@ func (mService *metaService) processCollectionCreate(id string, value string) {
|
|||
|
||||
col := mService.collectionUnmarshal(value)
|
||||
if col != nil {
|
||||
err := (*mService.container).addCollection(col, value)
|
||||
err := (*mService.replica).addCollection(col, value)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
for _, partitionTag := range col.PartitionTags {
|
||||
err = (*mService.container).addPartition(col.ID, partitionTag)
|
||||
err = (*mService.replica).addPartition(col.ID, partitionTag)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
|
|||
|
||||
// TODO: what if seg == nil? We need to notify master and return rpc request failed
|
||||
if seg != nil {
|
||||
err := (*mService.container).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
|
||||
err := (*mService.replica).addSegment(seg.SegmentID, seg.PartitionTag, seg.CollectionID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
|
@ -216,7 +216,7 @@ func (mService *metaService) processSegmentModify(id string, value string) {
|
|||
}
|
||||
|
||||
if seg != nil {
|
||||
targetSegment, err := (*mService.container).getSegmentByID(seg.SegmentID)
|
||||
targetSegment, err := (*mService.replica).getSegmentByID(seg.SegmentID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
|
@ -251,7 +251,7 @@ func (mService *metaService) processSegmentDelete(id string) {
|
|||
log.Println("Cannot parse segment id:" + id)
|
||||
}
|
||||
|
||||
err = (*mService.container).removeSegment(segmentID)
|
||||
err = (*mService.replica).removeSegment(segmentID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
|
@ -266,7 +266,7 @@ func (mService *metaService) processCollectionDelete(id string) {
|
|||
log.Println("Cannot parse collection id:" + id)
|
||||
}
|
||||
|
||||
err = (*mService.container).removeCollection(collectionID)
|
||||
err = (*mService.replica).removeCollection(collectionID)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
|
|
|
@ -27,9 +27,8 @@ func TestMetaService_start(t *testing.T) {
|
|||
}
|
||||
|
||||
// init query node
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
(*node.metaService).start()
|
||||
}
|
||||
|
@ -187,9 +186,8 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
id := "0"
|
||||
value := `schema: <
|
||||
|
@ -217,10 +215,10 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
|
|||
|
||||
node.metaService.processCollectionCreate(id, value)
|
||||
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
}
|
||||
|
@ -233,9 +231,8 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -278,10 +275,10 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
|
|||
colMetaBlob, err := proto.Marshal(&collectionMeta)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addPartition(UniqueID(0), "default")
|
||||
err = (*node.replica).addPartition(UniqueID(0), "default")
|
||||
assert.NoError(t, err)
|
||||
|
||||
id := "0"
|
||||
|
@ -293,7 +290,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
|
|||
|
||||
(*node.metaService).processSegmentCreate(id, value)
|
||||
|
||||
s, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
s, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, s.segmentID, UniqueID(0))
|
||||
}
|
||||
|
@ -306,9 +303,8 @@ func TestMetaService_processCreate(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
key1 := "by-dev/collection/0"
|
||||
msg1 := `schema: <
|
||||
|
@ -335,10 +331,10 @@ func TestMetaService_processCreate(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key1, msg1)
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
|
@ -350,7 +346,7 @@ func TestMetaService_processCreate(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key2, msg2)
|
||||
s, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
s, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, s.segmentID, UniqueID(0))
|
||||
}
|
||||
|
@ -363,9 +359,8 @@ func TestMetaService_processSegmentModify(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -408,10 +403,10 @@ func TestMetaService_processSegmentModify(t *testing.T) {
|
|||
colMetaBlob, err := proto.Marshal(&collectionMeta)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addPartition(UniqueID(0), "default")
|
||||
err = (*node.replica).addPartition(UniqueID(0), "default")
|
||||
assert.NoError(t, err)
|
||||
|
||||
id := "0"
|
||||
|
@ -422,7 +417,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processSegmentCreate(id, value)
|
||||
s, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
s, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, s.segmentID, UniqueID(0))
|
||||
|
||||
|
@ -434,7 +429,7 @@ func TestMetaService_processSegmentModify(t *testing.T) {
|
|||
|
||||
// TODO: modify segment for testing processCollectionModify
|
||||
(*node.metaService).processSegmentModify(id, newValue)
|
||||
seg, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, seg.segmentID, UniqueID(0))
|
||||
}
|
||||
|
@ -447,9 +442,8 @@ func TestMetaService_processCollectionModify(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
id := "0"
|
||||
value := `schema: <
|
||||
|
@ -476,10 +470,10 @@ func TestMetaService_processCollectionModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCollectionCreate(id, value)
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
|
@ -508,7 +502,7 @@ func TestMetaService_processCollectionModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCollectionModify(id, newValue)
|
||||
collection, err = (*node.container).getCollectionByName("test")
|
||||
collection, err = (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
}
|
||||
|
@ -521,9 +515,8 @@ func TestMetaService_processModify(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
key1 := "by-dev/collection/0"
|
||||
msg1 := `schema: <
|
||||
|
@ -550,10 +543,10 @@ func TestMetaService_processModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key1, msg1)
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
|
@ -565,7 +558,7 @@ func TestMetaService_processModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key2, msg2)
|
||||
s, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
s, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, s.segmentID, UniqueID(0))
|
||||
|
||||
|
@ -595,7 +588,7 @@ func TestMetaService_processModify(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processModify(key1, msg3)
|
||||
collection, err = (*node.container).getCollectionByName("test")
|
||||
collection, err = (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
|
@ -607,7 +600,7 @@ func TestMetaService_processModify(t *testing.T) {
|
|||
|
||||
// TODO: modify segment for testing processCollectionModify
|
||||
(*node.metaService).processModify(key2, msg4)
|
||||
seg, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, seg.segmentID, UniqueID(0))
|
||||
}
|
||||
|
@ -620,9 +613,8 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -665,10 +657,10 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
|
|||
colMetaBlob, err := proto.Marshal(&collectionMeta)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
err = (*node.replica).addCollection(&collectionMeta, string(colMetaBlob))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = (*node.container).addPartition(UniqueID(0), "default")
|
||||
err = (*node.replica).addPartition(UniqueID(0), "default")
|
||||
assert.NoError(t, err)
|
||||
|
||||
id := "0"
|
||||
|
@ -679,12 +671,12 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processSegmentCreate(id, value)
|
||||
seg, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, seg.segmentID, UniqueID(0))
|
||||
|
||||
(*node.metaService).processSegmentDelete("0")
|
||||
mapSize := (*node.container).getSegmentNum()
|
||||
mapSize := (*node.replica).getSegmentNum()
|
||||
assert.Equal(t, mapSize, 0)
|
||||
}
|
||||
|
||||
|
@ -696,9 +688,8 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
id := "0"
|
||||
value := `schema: <
|
||||
|
@ -725,15 +716,15 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCollectionCreate(id, value)
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
(*node.metaService).processCollectionDelete(id)
|
||||
collectionNum = (*node.container).getCollectionNum()
|
||||
collectionNum = (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 0)
|
||||
}
|
||||
|
||||
|
@ -745,9 +736,8 @@ func TestMetaService_processDelete(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
key1 := "by-dev/collection/0"
|
||||
msg1 := `schema: <
|
||||
|
@ -774,10 +764,10 @@ func TestMetaService_processDelete(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key1, msg1)
|
||||
collectionNum := (*node.container).getCollectionNum()
|
||||
collectionNum := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionNum, 1)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName("test")
|
||||
collection, err := (*node.replica).getCollectionByName("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.ID(), UniqueID(0))
|
||||
|
||||
|
@ -789,15 +779,15 @@ func TestMetaService_processDelete(t *testing.T) {
|
|||
`
|
||||
|
||||
(*node.metaService).processCreate(key2, msg2)
|
||||
seg, err := (*node.container).getSegmentByID(UniqueID(0))
|
||||
seg, err := (*node.replica).getSegmentByID(UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, seg.segmentID, UniqueID(0))
|
||||
|
||||
(*node.metaService).processDelete(key1)
|
||||
collectionsSize := (*node.container).getCollectionNum()
|
||||
collectionsSize := (*node.replica).getCollectionNum()
|
||||
assert.Equal(t, collectionsSize, 0)
|
||||
|
||||
mapSize := (*node.container).getSegmentNum()
|
||||
mapSize := (*node.replica).getSegmentNum()
|
||||
assert.Equal(t, mapSize, 0)
|
||||
}
|
||||
|
||||
|
@ -815,9 +805,8 @@ func TestMetaService_processResp(t *testing.T) {
|
|||
}
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
metaChan := (*node.metaService).kvBase.WatchWithPrefix("")
|
||||
|
||||
|
@ -843,9 +832,8 @@ func TestMetaService_loadCollections(t *testing.T) {
|
|||
}
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
err2 := (*node.metaService).loadCollections()
|
||||
assert.Nil(t, err2)
|
||||
|
@ -865,9 +853,8 @@ func TestMetaService_loadSegments(t *testing.T) {
|
|||
}
|
||||
|
||||
// init metaService
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node.metaService = newMetaService(ctx, node.container)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.metaService = newMetaService(ctx, node.replica)
|
||||
|
||||
err2 := (*node.metaService).loadSegments()
|
||||
assert.Nil(t, err2)
|
||||
|
|
|
@ -16,6 +16,23 @@ func (p *ParamTable) InitParamTable() {
|
|||
p.Init()
|
||||
}
|
||||
|
||||
func (p *ParamTable) PulsarAddress() (string, error) {
|
||||
url, err := p.Load("_PulsarAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return "pulsar://" + url, nil
|
||||
}
|
||||
|
||||
func (p *ParamTable) QueryNodeID() int {
|
||||
queryNodeID, _ := p.Load("reader.clientid")
|
||||
id, err := strconv.Atoi(queryNodeID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func (p *ParamTable) TopicStart() int {
|
||||
topicStart, _ := p.Load("reader.topicstart")
|
||||
topicStartNum, err := strconv.Atoi(topicStart)
|
|
@ -0,0 +1,25 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParamTable_QueryNodeID(t *testing.T) {
|
||||
Params.InitParamTable()
|
||||
id := Params.QueryNodeID()
|
||||
assert.Equal(t, id, 0)
|
||||
}
|
||||
|
||||
func TestParamTable_TopicStart(t *testing.T) {
|
||||
Params.InitParamTable()
|
||||
topicStart := Params.TopicStart()
|
||||
assert.Equal(t, topicStart, 0)
|
||||
}
|
||||
|
||||
func TestParamTable_TopicEnd(t *testing.T) {
|
||||
Params.InitParamTable()
|
||||
topicEnd := Params.TopicEnd()
|
||||
assert.Equal(t, topicEnd, 128)
|
||||
}
|
|
@ -13,8 +13,7 @@ import (
|
|||
|
||||
func TestPartition_Segments(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
collectionName := "collection0"
|
||||
fieldVec := schemapb.FieldSchema{
|
||||
|
@ -57,17 +56,17 @@ func TestPartition_Segments(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
for _, tag := range collectionMeta.PartitionTags {
|
||||
err := (*node.container).addPartition(collection.ID(), tag)
|
||||
err := (*node.replica).addPartition(collection.ID(), tag)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -78,7 +77,7 @@ func TestPartition_Segments(t *testing.T) {
|
|||
|
||||
const segmentNum = 3
|
||||
for i := 0; i < segmentNum; i++ {
|
||||
err := (*node.container).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
|
||||
err := (*node.replica).addSegment(UniqueID(i), targetPartition.partitionTag, collection.ID())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -14,18 +14,14 @@ import "C"
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type QueryNode struct {
|
||||
ctx context.Context
|
||||
|
||||
QueryNodeID uint64
|
||||
pulsarURL string
|
||||
|
||||
tSafe tSafe
|
||||
|
||||
container *container
|
||||
replica *collectionReplica
|
||||
|
||||
dataSyncService *dataSyncService
|
||||
metaService *metaService
|
||||
|
@ -33,36 +29,21 @@ type QueryNode struct {
|
|||
statsService *statsService
|
||||
}
|
||||
|
||||
type tSafe interface {
|
||||
getTSafe() Timestamp
|
||||
setTSafe(t Timestamp)
|
||||
}
|
||||
|
||||
type serviceTime struct {
|
||||
tSafeMu sync.Mutex
|
||||
time Timestamp
|
||||
}
|
||||
|
||||
func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode {
|
||||
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
|
||||
segmentsMap := make(map[int64]*Segment)
|
||||
collections := make([]*Collection, 0)
|
||||
|
||||
var container container = &colSegContainer{
|
||||
var replica collectionReplica = &collectionReplicaImpl{
|
||||
collections: collections,
|
||||
segments: segmentsMap,
|
||||
}
|
||||
|
||||
var tSafe tSafe = &serviceTime{}
|
||||
|
||||
return &QueryNode{
|
||||
ctx: ctx,
|
||||
|
||||
QueryNodeID: queryNodeID,
|
||||
pulsarURL: pulsarURL,
|
||||
|
||||
tSafe: tSafe,
|
||||
|
||||
container: &container,
|
||||
replica: &replica,
|
||||
|
||||
dataSyncService: nil,
|
||||
metaService: nil,
|
||||
|
@ -72,10 +53,10 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
|
|||
}
|
||||
|
||||
func (node *QueryNode) Start() {
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
|
||||
node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
|
||||
node.metaService = newMetaService(node.ctx, node.container)
|
||||
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
|
||||
node.searchService = newSearchService(node.ctx, node.replica)
|
||||
node.metaService = newMetaService(node.ctx, node.replica)
|
||||
node.statsService = newStatsService(node.ctx, node.replica)
|
||||
|
||||
go node.dataSyncService.start()
|
||||
// go node.searchService.start()
|
||||
|
@ -86,15 +67,3 @@ func (node *QueryNode) Start() {
|
|||
func (node *QueryNode) Close() {
|
||||
// TODO: close services
|
||||
}
|
||||
|
||||
func (st *serviceTime) getTSafe() Timestamp {
|
||||
st.tSafeMu.Lock()
|
||||
defer st.tSafeMu.Unlock()
|
||||
return st.time
|
||||
}
|
||||
|
||||
func (st *serviceTime) setTSafe(t Timestamp) {
|
||||
st.tSafeMu.Lock()
|
||||
st.time = t
|
||||
st.tSafeMu.Unlock()
|
||||
}
|
||||
|
|
|
@ -23,10 +23,6 @@ func TestQueryNode_start(t *testing.T) {
|
|||
ctx = context.Background()
|
||||
}
|
||||
|
||||
pulsarAddr, err := Params.PulsarAddress()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
node.Start()
|
||||
}
|
||||
|
|
|
@ -8,8 +8,8 @@ func Init() {
|
|||
Params.Init()
|
||||
}
|
||||
|
||||
func StartQueryNode(ctx context.Context, pulsarURL string) {
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
func StartQueryNode(ctx context.Context) {
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
node.Start()
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -19,7 +20,7 @@ type searchService struct {
|
|||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
node *QueryNode
|
||||
replica *collectionReplica
|
||||
searchMsgStream *msgstream.MsgStream
|
||||
searchResultMsgStream *msgstream.MsgStream
|
||||
}
|
||||
|
@ -31,24 +32,29 @@ type SearchResult struct {
|
|||
ResultDistances []float32
|
||||
}
|
||||
|
||||
func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *searchService {
|
||||
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
|
||||
const (
|
||||
//TODO:: read config file
|
||||
receiveBufSize = 1024
|
||||
pulsarBufSize = 1024
|
||||
)
|
||||
|
||||
msgStreamURL, err := Params.PulsarAddress()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
consumeChannels := []string{"search"}
|
||||
consumeSubName := "subSearch"
|
||||
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
searchStream.SetPulsarCient(pulsarURL)
|
||||
searchStream.SetPulsarCient(msgStreamURL)
|
||||
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
||||
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
||||
var inputStream msgstream.MsgStream = searchStream
|
||||
|
||||
producerChannels := []string{"searchResult"}
|
||||
searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
|
||||
searchResultStream.SetPulsarCient(pulsarURL)
|
||||
searchResultStream.SetPulsarCient(msgStreamURL)
|
||||
searchResultStream.CreatePulsarProducers(producerChannels)
|
||||
var outputStream msgstream.MsgStream = searchResultStream
|
||||
|
||||
|
@ -57,7 +63,7 @@ func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *s
|
|||
ctx: searchServiceCtx,
|
||||
cancel: searchServiceCancel,
|
||||
|
||||
node: node,
|
||||
replica: replica,
|
||||
searchMsgStream: &inputStream,
|
||||
searchResultMsgStream: &outputStream,
|
||||
}
|
||||
|
@ -120,7 +126,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
|
|||
}
|
||||
collectionName := query.CollectionName
|
||||
partitionTags := query.PartitionTags
|
||||
collection, err := (*ss.node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*ss.replica).getCollectionByName(collectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -150,7 +156,7 @@ func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
|
|||
|
||||
// 3. Do search in all segments
|
||||
for _, partitionTag := range partitionTags {
|
||||
partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag)
|
||||
partition, err := (*ss.replica).getPartitionByTag(collectionID, partitionTag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,12 +21,13 @@ import (
|
|||
)
|
||||
|
||||
func TestSearch_Search(t *testing.T) {
|
||||
Params.Init()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// init query node
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
// init meta
|
||||
collectionName := "collection0"
|
||||
|
@ -70,20 +71,20 @@ func TestSearch_Search(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// test data generate
|
||||
|
@ -162,7 +163,7 @@ func TestSearch_Search(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// dataSync
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node, node.pulsarURL)
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
|
||||
go node.dataSyncService.start()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
@ -233,7 +234,7 @@ func TestSearch_Search(t *testing.T) {
|
|||
err = searchMsgStream.Produce(&msgPackSearch)
|
||||
assert.NoError(t, err)
|
||||
|
||||
node.searchService = newSearchService(node.ctx, node, node.pulsarURL)
|
||||
node.searchService = newSearchService(node.ctx, node.replica)
|
||||
go node.searchService.start()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
|
|
@ -13,21 +13,17 @@ import (
|
|||
)
|
||||
|
||||
type statsService struct {
|
||||
ctx context.Context
|
||||
pulsarURL string
|
||||
|
||||
msgStream *msgstream.MsgStream
|
||||
|
||||
container *container
|
||||
ctx context.Context
|
||||
statsStream *msgstream.MsgStream
|
||||
replica *collectionReplica
|
||||
}
|
||||
|
||||
func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService {
|
||||
func newStatsService(ctx context.Context, replica *collectionReplica) *statsService {
|
||||
|
||||
return &statsService{
|
||||
ctx: ctx,
|
||||
pulsarURL: pulsarURL,
|
||||
msgStream: nil,
|
||||
container: container,
|
||||
ctx: ctx,
|
||||
statsStream: nil,
|
||||
replica: replica,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,16 +34,20 @@ func (sService *statsService) start() {
|
|||
)
|
||||
|
||||
// start pulsar
|
||||
msgStreamURL, err := Params.PulsarAddress()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
producerChannels := []string{"statistic"}
|
||||
|
||||
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
|
||||
statsStream.SetPulsarCient(sService.pulsarURL)
|
||||
statsStream.SetPulsarCient(msgStreamURL)
|
||||
statsStream.CreatePulsarProducers(producerChannels)
|
||||
|
||||
var statsMsgStream msgstream.MsgStream = statsStream
|
||||
|
||||
sService.msgStream = &statsMsgStream
|
||||
(*sService.msgStream).Start()
|
||||
sService.statsStream = &statsMsgStream
|
||||
(*sService.statsStream).Start()
|
||||
|
||||
// start service
|
||||
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
|
||||
|
@ -62,7 +62,7 @@ func (sService *statsService) start() {
|
|||
}
|
||||
|
||||
func (sService *statsService) sendSegmentStatistic() {
|
||||
statisticData := (*sService.container).getSegmentStatistics()
|
||||
statisticData := (*sService.replica).getSegmentStatistics()
|
||||
|
||||
// fmt.Println("Publish segment statistic")
|
||||
// fmt.Println(statisticData)
|
||||
|
@ -80,7 +80,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg
|
|||
var msgPack = msgstream.MsgPack{
|
||||
Msgs: []msgstream.TsMsg{msg},
|
||||
}
|
||||
err := (*sService.msgStream).Produce(&msgPack)
|
||||
err := (*sService.statsStream).Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
|
||||
// NOTE: start pulsar before test
|
||||
func TestStatsService_start(t *testing.T) {
|
||||
Params.Init()
|
||||
var ctx context.Context
|
||||
|
||||
if closeWithDeadline {
|
||||
|
@ -27,8 +28,7 @@ func TestStatsService_start(t *testing.T) {
|
|||
}
|
||||
|
||||
// init query node
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
// init meta
|
||||
collectionName := "collection0"
|
||||
|
@ -72,29 +72,30 @@ func TestStatsService_start(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// start stats service
|
||||
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
|
||||
node.statsService = newStatsService(node.ctx, node.replica)
|
||||
node.statsService.start()
|
||||
}
|
||||
|
||||
// NOTE: start pulsar before test
|
||||
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
|
||||
Params.Init()
|
||||
var ctx context.Context
|
||||
|
||||
if closeWithDeadline {
|
||||
|
@ -108,7 +109,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
|
|||
|
||||
// init query node
|
||||
pulsarURL := "pulsar://localhost:6650"
|
||||
node := NewQueryNode(ctx, 0, pulsarURL)
|
||||
node := NewQueryNode(ctx, 0)
|
||||
|
||||
// init meta
|
||||
collectionName := "collection0"
|
||||
|
@ -152,20 +153,20 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
|
|||
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
|
||||
assert.NotEqual(t, "", collectionMetaBlob)
|
||||
|
||||
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
|
||||
assert.NoError(t, err)
|
||||
|
||||
collection, err := (*node.container).getCollectionByName(collectionName)
|
||||
collection, err := (*node.replica).getCollectionByName(collectionName)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, collection.meta.Schema.Name, "collection0")
|
||||
assert.Equal(t, collection.meta.ID, UniqueID(0))
|
||||
assert.Equal(t, (*node.container).getCollectionNum(), 1)
|
||||
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
|
||||
|
||||
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
|
||||
assert.NoError(t, err)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
|
||||
assert.NoError(t, err)
|
||||
|
||||
const receiveBufSize = 1024
|
||||
|
@ -178,9 +179,9 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
|
|||
|
||||
var statsMsgStream msgstream.MsgStream = statsStream
|
||||
|
||||
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
|
||||
node.statsService.msgStream = &statsMsgStream
|
||||
(*node.statsService.msgStream).Start()
|
||||
node.statsService = newStatsService(node.ctx, node.replica)
|
||||
node.statsService.statsStream = &statsMsgStream
|
||||
(*node.statsService.statsStream).Start()
|
||||
|
||||
// send stats
|
||||
node.statsService.sendSegmentStatistic()
|
||||
|
|
|
@ -2,6 +2,7 @@ package flowgraph
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
|
@ -68,6 +69,15 @@ func (fg *TimeTickedFlowGraph) Start() {
|
|||
|
||||
func (fg *TimeTickedFlowGraph) Close() {
|
||||
for _, v := range fg.nodeCtx {
|
||||
// close message stream
|
||||
if (*v.node).IsInputNode() {
|
||||
inStream, ok := (*v.node).(*InputNode)
|
||||
if !ok {
|
||||
log.Fatal("Invalid inputNode")
|
||||
}
|
||||
(*inStream.inStream).Close()
|
||||
}
|
||||
// close input channels
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue