mirror of https://github.com/milvus-io/milvus.git
Enable complex dsl parser
Signed-off-by: FluorineDog <guilin.gou@zilliz.com>pull/4973/head^2
parent
0d75840ed6
commit
e65cfe1e3d
2
Makefile
2
Makefile
|
@ -61,7 +61,7 @@ ruleguard:
|
|||
verifiers: getdeps cppcheck fmt lint ruleguard
|
||||
|
||||
# Builds various components locally.
|
||||
build-go:
|
||||
build-go: build-cpp
|
||||
@echo "Building each component's binary to './bin'"
|
||||
@echo "Building master ..."
|
||||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if len(os.Args) == 1 {
|
||||
fmt.Println("usage: binlog file1 file2 ...")
|
||||
}
|
||||
if err := storage.PrintBinlogFiles(os.Args[1:]); err != nil {
|
||||
fmt.Printf("error: %s\n", err.Error())
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
#include <memory>
|
||||
#include <boost/align/aligned_allocator.hpp>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <algorithm>
|
||||
|
||||
namespace milvus::query {
|
||||
|
||||
|
@ -39,10 +40,8 @@ const std::map<std::string, RangeExpr::OpType> RangeExpr::mapping_ = {
|
|||
|
||||
class Parser {
|
||||
public:
|
||||
static std::unique_ptr<Plan>
|
||||
CreatePlan(const Schema& schema, const std::string& dsl_str) {
|
||||
return Parser(schema).CreatePlanImpl(dsl_str);
|
||||
}
|
||||
friend std::unique_ptr<Plan>
|
||||
CreatePlan(const Schema& schema, const std::string& dsl_str);
|
||||
|
||||
private:
|
||||
std::unique_ptr<Plan>
|
||||
|
@ -51,29 +50,55 @@ class Parser {
|
|||
explicit Parser(const Schema& schema) : schema(schema) {
|
||||
}
|
||||
|
||||
// vector node parser, should be called exactly once per pass.
|
||||
std::unique_ptr<VectorPlanNode>
|
||||
ParseVecNode(const Json& out_body);
|
||||
|
||||
// Dispatcher of all parse function
|
||||
// NOTE: when nullptr, it is a pure vector node
|
||||
ExprPtr
|
||||
ParseAnyNode(const Json& body);
|
||||
|
||||
ExprPtr
|
||||
ParseMustNode(const Json& body);
|
||||
|
||||
ExprPtr
|
||||
ParseShouldNode(const Json& body);
|
||||
|
||||
ExprPtr
|
||||
ParseShouldNotNode(const Json& body);
|
||||
|
||||
// parse the value of "should"/"must"/"should_not" entry
|
||||
std::vector<ExprPtr>
|
||||
ParseItemList(const Json& body);
|
||||
|
||||
// parse the value of "range" entry
|
||||
ExprPtr
|
||||
ParseRangeNode(const Json& out_body);
|
||||
|
||||
// parse the value of "term" entry
|
||||
ExprPtr
|
||||
ParseTermNode(const Json& out_body);
|
||||
|
||||
private:
|
||||
// template implementation of leaf parser
|
||||
// used by corresponding parser
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
ParseRangeNodeImpl(const std::string& field_name, const Json& body);
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
ParseTermNodeImpl(const std::string& field_name, const Json& body);
|
||||
|
||||
std::unique_ptr<Expr>
|
||||
ParseRangeNode(const Json& out_body);
|
||||
|
||||
std::unique_ptr<Expr>
|
||||
ParseTermNode(const Json& out_body);
|
||||
|
||||
private:
|
||||
const Schema& schema;
|
||||
std::map<std::string, FieldId> tag2field_; // PlaceholderName -> FieldId
|
||||
std::optional<std::unique_ptr<VectorPlanNode>> vector_node_opt_;
|
||||
};
|
||||
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
Parser::ParseRangeNode(const Json& out_body) {
|
||||
Assert(out_body.is_object());
|
||||
Assert(out_body.size() == 1);
|
||||
|
@ -84,9 +109,8 @@ Parser::ParseRangeNode(const Json& out_body) {
|
|||
Assert(!field_is_vector(data_type));
|
||||
|
||||
switch (data_type) {
|
||||
case DataType::BOOL: {
|
||||
case DataType::BOOL:
|
||||
return ParseRangeNodeImpl<bool>(field_name, body);
|
||||
}
|
||||
case DataType::INT8:
|
||||
return ParseRangeNodeImpl<int8_t>(field_name, body);
|
||||
case DataType::INT16:
|
||||
|
@ -106,51 +130,22 @@ Parser::ParseRangeNode(const Json& out_body) {
|
|||
|
||||
std::unique_ptr<Plan>
|
||||
Parser::CreatePlanImpl(const std::string& dsl_str) {
|
||||
auto plan = std::make_unique<Plan>(schema);
|
||||
auto dsl = nlohmann::json::parse(dsl_str);
|
||||
nlohmann::json vec_pack;
|
||||
std::optional<std::unique_ptr<Expr>> predicate;
|
||||
// top level
|
||||
auto& bool_dsl = dsl.at("bool");
|
||||
if (bool_dsl.contains("must")) {
|
||||
auto& packs = bool_dsl.at("must");
|
||||
Assert(packs.is_array());
|
||||
for (auto& pack : packs) {
|
||||
if (pack.contains("vector")) {
|
||||
auto& out_body = pack.at("vector");
|
||||
plan->plan_node_ = ParseVecNode(out_body);
|
||||
} else if (pack.contains("term")) {
|
||||
AssertInfo(!predicate, "unsupported complex DSL");
|
||||
auto& out_body = pack.at("term");
|
||||
predicate = ParseTermNode(out_body);
|
||||
} else if (pack.contains("range")) {
|
||||
AssertInfo(!predicate, "unsupported complex DSL");
|
||||
auto& out_body = pack.at("range");
|
||||
predicate = ParseRangeNode(out_body);
|
||||
} else {
|
||||
PanicInfo("unsupported node");
|
||||
}
|
||||
}
|
||||
AssertInfo(plan->plan_node_, "vector node not found");
|
||||
} else if (bool_dsl.contains("vector")) {
|
||||
auto& out_body = bool_dsl.at("vector");
|
||||
plan->plan_node_ = ParseVecNode(out_body);
|
||||
Assert(plan->plan_node_);
|
||||
} else {
|
||||
PanicInfo("Unsupported DSL");
|
||||
auto dsl = Json::parse(dsl_str);
|
||||
auto bool_dsl = dsl.at("bool");
|
||||
auto predicate = ParseAnyNode(bool_dsl);
|
||||
Assert(vector_node_opt_.has_value());
|
||||
auto vec_node = std::move(vector_node_opt_).value();
|
||||
if (predicate != nullptr) {
|
||||
vec_node->predicate_ = std::move(predicate);
|
||||
}
|
||||
plan->plan_node_->predicate_ = std::move(predicate);
|
||||
|
||||
auto plan = std::make_unique<Plan>(schema);
|
||||
plan->tag2field_ = std::move(tag2field_);
|
||||
// TODO: target_entry parser
|
||||
// if schema autoid is true,
|
||||
// prepend target_entries_ with row_id
|
||||
// else
|
||||
// with primary_key
|
||||
//
|
||||
plan->plan_node_ = std::move(vec_node);
|
||||
return plan;
|
||||
}
|
||||
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
Parser::ParseTermNode(const Json& out_body) {
|
||||
Assert(out_body.size() == 1);
|
||||
auto out_iter = out_body.begin();
|
||||
|
@ -221,7 +216,7 @@ Parser::ParseVecNode(const Json& out_body) {
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
Parser::ParseTermNodeImpl(const std::string& field_name, const Json& body) {
|
||||
auto expr = std::make_unique<TermExprImpl<T>>();
|
||||
auto data_type = schema[field_name].get_data_type();
|
||||
|
@ -249,7 +244,7 @@ Parser::ParseTermNodeImpl(const std::string& field_name, const Json& body) {
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
std::unique_ptr<Expr>
|
||||
ExprPtr
|
||||
Parser::ParseRangeNodeImpl(const std::string& field_name, const Json& body) {
|
||||
auto expr = std::make_unique<RangeExprImpl<T>>();
|
||||
auto data_type = schema[field_name].get_data_type();
|
||||
|
@ -278,12 +273,6 @@ Parser::ParseRangeNodeImpl(const std::string& field_name, const Json& body) {
|
|||
return expr;
|
||||
}
|
||||
|
||||
std::unique_ptr<Plan>
|
||||
CreatePlan(const Schema& schema, const std::string& dsl_str) {
|
||||
auto plan = Parser::CreatePlan(schema, dsl_str);
|
||||
return plan;
|
||||
}
|
||||
|
||||
std::unique_ptr<PlaceholderGroup>
|
||||
ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
|
||||
namespace ser = milvus::proto::service;
|
||||
|
@ -313,6 +302,150 @@ ParsePlaceholderGroup(const Plan* plan, const std::string& blob) {
|
|||
return result;
|
||||
}
|
||||
|
||||
std::unique_ptr<Plan>
|
||||
CreatePlan(const Schema& schema, const std::string& dsl_str) {
|
||||
auto plan = Parser(schema).CreatePlanImpl(dsl_str);
|
||||
return plan;
|
||||
}
|
||||
|
||||
std::vector<ExprPtr>
|
||||
Parser::ParseItemList(const Json& body) {
|
||||
std::vector<ExprPtr> results;
|
||||
if (body.is_object()) {
|
||||
// only one item;
|
||||
auto new_entry = ParseAnyNode(body);
|
||||
results.emplace_back(std::move(new_entry));
|
||||
} else {
|
||||
// item array
|
||||
Assert(body.is_array());
|
||||
for (auto& item : body) {
|
||||
auto new_entry = ParseAnyNode(item);
|
||||
results.emplace_back(std::move(new_entry));
|
||||
}
|
||||
}
|
||||
auto old_size = results.size();
|
||||
|
||||
auto new_end = std::remove_if(results.begin(), results.end(), [](const ExprPtr& x) { return x == nullptr; });
|
||||
|
||||
results.resize(new_end - results.begin());
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
ExprPtr
|
||||
Parser::ParseAnyNode(const Json& out_body) {
|
||||
Assert(out_body.is_object());
|
||||
Assert(out_body.size() == 1);
|
||||
|
||||
auto out_iter = out_body.begin();
|
||||
|
||||
auto key = out_iter.key();
|
||||
auto body = out_iter.value();
|
||||
|
||||
if (key == "must") {
|
||||
return ParseMustNode(body);
|
||||
} else if (key == "should") {
|
||||
return ParseShouldNode(body);
|
||||
} else if (key == "should_not") {
|
||||
return ParseShouldNotNode(body);
|
||||
} else if (key == "range") {
|
||||
return ParseRangeNode(body);
|
||||
} else if (key == "term") {
|
||||
return ParseTermNode(body);
|
||||
} else if (key == "vector") {
|
||||
auto vec_node = ParseVecNode(body);
|
||||
Assert(!vector_node_opt_.has_value());
|
||||
vector_node_opt_ = std::move(vec_node);
|
||||
return nullptr;
|
||||
} else {
|
||||
PanicInfo("unsupported key: " + key);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Merger>
|
||||
static ExprPtr
|
||||
ConstructTree(Merger merger, std::vector<ExprPtr> item_list) {
|
||||
if (item_list.size() == 0) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (item_list.size() == 1) {
|
||||
return std::move(item_list[0]);
|
||||
}
|
||||
|
||||
// Note: use deque to construct a binary tree
|
||||
// Op
|
||||
// / \
|
||||
// Op Op
|
||||
// | \ | \
|
||||
// A B C D
|
||||
std::deque<ExprPtr> binary_queue;
|
||||
for (auto& item : item_list) {
|
||||
Assert(item != nullptr);
|
||||
binary_queue.push_back(std::move(item));
|
||||
}
|
||||
while (binary_queue.size() > 1) {
|
||||
auto left = std::move(binary_queue.front());
|
||||
binary_queue.pop_front();
|
||||
auto right = std::move(binary_queue.front());
|
||||
binary_queue.pop_front();
|
||||
binary_queue.push_back(merger(std::move(left), std::move(right)));
|
||||
}
|
||||
Assert(binary_queue.size() == 1);
|
||||
return std::move(binary_queue.front());
|
||||
}
|
||||
|
||||
ExprPtr
|
||||
Parser::ParseMustNode(const Json& body) {
|
||||
auto item_list = ParseItemList(body);
|
||||
auto merger = [](ExprPtr left, ExprPtr right) {
|
||||
using OpType = BoolBinaryExpr::OpType;
|
||||
auto res = std::make_unique<BoolBinaryExpr>();
|
||||
res->op_type_ = OpType::LogicalAnd;
|
||||
res->left_ = std::move(left);
|
||||
res->right_ = std::move(right);
|
||||
return res;
|
||||
};
|
||||
return ConstructTree(merger, std::move(item_list));
|
||||
}
|
||||
|
||||
ExprPtr
|
||||
Parser::ParseShouldNode(const Json& body) {
|
||||
auto item_list = ParseItemList(body);
|
||||
Assert(item_list.size() >= 1);
|
||||
auto merger = [](ExprPtr left, ExprPtr right) {
|
||||
using OpType = BoolBinaryExpr::OpType;
|
||||
auto res = std::make_unique<BoolBinaryExpr>();
|
||||
res->op_type_ = OpType::LogicalOr;
|
||||
res->left_ = std::move(left);
|
||||
res->right_ = std::move(right);
|
||||
return res;
|
||||
};
|
||||
return ConstructTree(merger, std::move(item_list));
|
||||
}
|
||||
|
||||
ExprPtr
|
||||
Parser::ParseShouldNotNode(const Json& body) {
|
||||
auto item_list = ParseItemList(body);
|
||||
Assert(item_list.size() >= 1);
|
||||
auto merger = [](ExprPtr left, ExprPtr right) {
|
||||
using OpType = BoolBinaryExpr::OpType;
|
||||
auto res = std::make_unique<BoolBinaryExpr>();
|
||||
res->op_type_ = OpType::LogicalAnd;
|
||||
res->left_ = std::move(left);
|
||||
res->right_ = std::move(right);
|
||||
return res;
|
||||
};
|
||||
auto subtree = ConstructTree(merger, std::move(item_list));
|
||||
|
||||
using OpType = BoolUnaryExpr::OpType;
|
||||
auto res = std::make_unique<BoolUnaryExpr>();
|
||||
res->op_type_ = OpType::LogicalNot;
|
||||
res->child_ = std::move(subtree);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
int64_t
|
||||
GetTopK(const Plan* plan) {
|
||||
return plan->plan_node_->query_info_.topK_;
|
||||
|
|
|
@ -67,6 +67,7 @@ ExecExprVisitor::visit(BoolUnaryExpr& expr) {
|
|||
switch (expr.op_type_) {
|
||||
case OpType::LogicalNot: {
|
||||
chunk.flip();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
PanicInfo("Invalid OpType");
|
||||
|
|
|
@ -410,3 +410,104 @@ TEST(Expr, TestTerm) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Expr, TestSimpleDsl) {
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
|
||||
auto vec_dsl = Json::parse(R"(
|
||||
{
|
||||
"vector": {
|
||||
"fakevec": {
|
||||
"metric_type": "L2",
|
||||
"params": {
|
||||
"nprobe": 10
|
||||
},
|
||||
"query": "$0",
|
||||
"topk": 10
|
||||
}
|
||||
}
|
||||
}
|
||||
)");
|
||||
|
||||
int N = 32;
|
||||
auto get_item = [&](int base, int bit = 1) {
|
||||
std::vector<int> terms;
|
||||
// note: random gen range is [0, 2N)
|
||||
for (int i = 0; i < N * 2; ++i) {
|
||||
if (((i >> base) & 0x1) == bit) {
|
||||
terms.push_back(i);
|
||||
}
|
||||
}
|
||||
Json s;
|
||||
s["term"]["age"]["values"] = terms;
|
||||
return s;
|
||||
};
|
||||
// std::cout << get_item(0).dump(-2);
|
||||
// std::cout << vec_dsl.dump(-2);
|
||||
std::vector<std::tuple<Json, std::function<bool(int)>>> testcases;
|
||||
{
|
||||
Json dsl;
|
||||
dsl["must"] = Json::array({vec_dsl, get_item(0), get_item(1), get_item(2, 0), get_item(3)});
|
||||
testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) == 0b1011; });
|
||||
}
|
||||
|
||||
{
|
||||
Json dsl;
|
||||
Json sub_dsl;
|
||||
sub_dsl["must"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)});
|
||||
dsl["must"] = Json::array({sub_dsl, vec_dsl});
|
||||
testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) == 0b1011; });
|
||||
}
|
||||
|
||||
{
|
||||
Json dsl;
|
||||
Json sub_dsl;
|
||||
sub_dsl["should"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)});
|
||||
dsl["must"] = Json::array({sub_dsl, vec_dsl});
|
||||
testcases.emplace_back(dsl, [](int x) { return !!((x & 0b1111) ^ 0b0100); });
|
||||
}
|
||||
|
||||
{
|
||||
Json dsl;
|
||||
Json sub_dsl;
|
||||
sub_dsl["should_not"] = Json::array({get_item(0), get_item(1), get_item(2, 0), get_item(3)});
|
||||
dsl["must"] = Json::array({sub_dsl, vec_dsl});
|
||||
testcases.emplace_back(dsl, [](int x) { return (x & 0b1111) != 0b1011; });
|
||||
}
|
||||
|
||||
auto schema = std::make_shared<Schema>();
|
||||
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2);
|
||||
schema->AddField("age", DataType::INT32);
|
||||
|
||||
auto seg = CreateSegment(schema);
|
||||
std::vector<int> age_col;
|
||||
int num_iters = 100;
|
||||
for (int iter = 0; iter < num_iters; ++iter) {
|
||||
auto raw_data = DataGen(schema, N, iter);
|
||||
auto new_age_col = raw_data.get_col<int>(1);
|
||||
age_col.insert(age_col.end(), new_age_col.begin(), new_age_col.end());
|
||||
seg->PreInsert(N);
|
||||
seg->Insert(iter * N, N, raw_data.row_ids_.data(), raw_data.timestamps_.data(), raw_data.raw_);
|
||||
}
|
||||
|
||||
auto seg_promote = dynamic_cast<SegmentSmallIndex*>(seg.get());
|
||||
ExecExprVisitor visitor(*seg_promote);
|
||||
for (auto [clause, ref_func] : testcases) {
|
||||
Json dsl;
|
||||
dsl["bool"] = clause;
|
||||
// std::cout << dsl.dump(2);
|
||||
auto plan = CreatePlan(*schema, dsl.dump());
|
||||
auto final = visitor.call_child(*plan->plan_node_->predicate_.value());
|
||||
EXPECT_EQ(final.size(), upper_div(N * num_iters, DefaultElementPerChunk));
|
||||
|
||||
for (int i = 0; i < N * num_iters; ++i) {
|
||||
auto vec_id = i / DefaultElementPerChunk;
|
||||
auto offset = i % DefaultElementPerChunk;
|
||||
bool ans = final[vec_id][offset];
|
||||
auto val = age_col[i];
|
||||
auto ref = ref_func(val);
|
||||
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
)
|
||||
|
||||
func TestPrintBinlogFilesInt64(t *testing.T) {
|
||||
w, err := NewInsertBinlogWriter(schemapb.DataType_INT64, 10, 20, 30, 40)
|
||||
assert.Nil(t, err)
|
||||
|
||||
curTS := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
|
||||
e1, err := w.NextInsertEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3})
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6})
|
||||
assert.NotNil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6})
|
||||
assert.Nil(t, err)
|
||||
e1.SetStartTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0))
|
||||
e1.SetEndTimestamp(tsoutil.ComposeTS(curTS+20*60*1000, 0))
|
||||
|
||||
e2, err := w.NextInsertEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9})
|
||||
assert.Nil(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true})
|
||||
assert.NotNil(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12})
|
||||
assert.Nil(t, err)
|
||||
e2.SetStartTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0))
|
||||
e2.SetEndTimestamp(tsoutil.ComposeTS(curTS+40*60*1000, 0))
|
||||
|
||||
w.SetStartTimeStamp(tsoutil.ComposeTS(curTS, 0))
|
||||
w.SetEndTimeStamp(tsoutil.ComposeTS(curTS+3600*1000, 0))
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
assert.Nil(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.Nil(t, err)
|
||||
|
||||
fd, err := os.Create("/tmp/binlog_int64.db")
|
||||
assert.Nil(t, err)
|
||||
num, err := fd.Write(buf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, num, len(buf))
|
||||
err = fd.Close()
|
||||
assert.Nil(t, err)
|
||||
}
|
|
@ -0,0 +1,347 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
)
|
||||
|
||||
func PrintBinlogFiles(fileList []string) error {
|
||||
for _, file := range fileList {
|
||||
if err := printBinlogFile(file); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func printBinlogFile(filename string) error {
|
||||
fd, err := os.OpenFile(filename, os.O_RDONLY, 0400)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fd.Close()
|
||||
|
||||
fileInfo, err := fd.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("file size = %d\n", fileInfo.Size())
|
||||
|
||||
b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileInfo.Size()), syscall.PROT_READ, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer syscall.Munmap(b)
|
||||
|
||||
fmt.Printf("buf size = %d\n", len(b))
|
||||
|
||||
r, err := NewBinlogReader(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
fmt.Println("descriptor event header:")
|
||||
physical, _ := tsoutil.ParseTS(r.descriptorEvent.descriptorEventHeader.Timestamp)
|
||||
fmt.Printf("\tTimestamp: %v\n", physical)
|
||||
fmt.Printf("\tTypeCode: %s\n", r.descriptorEvent.descriptorEventHeader.TypeCode.String())
|
||||
fmt.Printf("\tServerID: %d\n", r.descriptorEvent.descriptorEventHeader.ServerID)
|
||||
fmt.Printf("\tEventLength: %d\n", r.descriptorEvent.descriptorEventHeader.EventLength)
|
||||
fmt.Printf("\tNextPosition :%d\n", r.descriptorEvent.descriptorEventHeader.NextPosition)
|
||||
fmt.Println("descriptor event data:")
|
||||
fmt.Printf("\tBinlogVersion: %d\n", r.descriptorEvent.descriptorEventData.BinlogVersion)
|
||||
fmt.Printf("\tServerVersion: %d\n", r.descriptorEvent.descriptorEventData.ServerVersion)
|
||||
fmt.Printf("\tCommitID: %d\n", r.descriptorEvent.descriptorEventData.CommitID)
|
||||
fmt.Printf("\tHeaderLength: %d\n", r.descriptorEvent.descriptorEventData.HeaderLength)
|
||||
fmt.Printf("\tCollectionID: %d\n", r.descriptorEvent.descriptorEventData.CollectionID)
|
||||
fmt.Printf("\tPartitionID: %d\n", r.descriptorEvent.descriptorEventData.PartitionID)
|
||||
fmt.Printf("\tSegmentID: %d\n", r.descriptorEvent.descriptorEventData.SegmentID)
|
||||
fmt.Printf("\tFieldID: %d\n", r.descriptorEvent.descriptorEventData.FieldID)
|
||||
physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
dataTypeName, ok := schemapb.DataType_name[int32(r.descriptorEvent.descriptorEventData.PayloadDataType)]
|
||||
if !ok {
|
||||
return errors.Errorf("undefine data type %d", r.descriptorEvent.descriptorEventData.PayloadDataType)
|
||||
}
|
||||
fmt.Printf("\tPayloadDataType: %v\n", dataTypeName)
|
||||
fmt.Printf("\tPostHeaderLengths: %v\n", r.descriptorEvent.descriptorEventData.PostHeaderLengths)
|
||||
eventNum := 0
|
||||
for {
|
||||
event, err := r.NextEventReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
fmt.Printf("event %d header:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(event.eventHeader.Timestamp)
|
||||
fmt.Printf("\tTimestamp: %v\n", physical)
|
||||
fmt.Printf("\tTypeCode: %s\n", event.eventHeader.TypeCode.String())
|
||||
fmt.Printf("\tServerID: %d\n", event.eventHeader.ServerID)
|
||||
fmt.Printf("\tEventLength: %d\n", event.eventHeader.EventLength)
|
||||
fmt.Printf("\tNextPosition: %d\n", event.eventHeader.NextPosition)
|
||||
switch event.eventHeader.TypeCode {
|
||||
case InsertEventType:
|
||||
evd, ok := event.eventData.(*insertEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d insert event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
case DeleteEventType:
|
||||
evd, ok := event.eventData.(*deleteEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d delete event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
case CreateCollectionEventType:
|
||||
evd, ok := event.eventData.(*createCollectionEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d create collection event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
case DropCollectionEventType:
|
||||
evd, ok := event.eventData.(*dropCollectionEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d drop collection event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
case CreatePartitionEventType:
|
||||
evd, ok := event.eventData.(*createPartitionEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d create partition event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
case DropPartitionEventType:
|
||||
evd, ok := event.eventData.(*dropPartitionEventData)
|
||||
if !ok {
|
||||
return errors.Errorf("incorrect event data type")
|
||||
}
|
||||
fmt.Printf("event %d drop partition event:\n", eventNum)
|
||||
physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
|
||||
fmt.Printf("\tStartTimestamp: %v\n", physical)
|
||||
physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
|
||||
fmt.Printf("\tEndTimestamp: %v\n", physical)
|
||||
if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return errors.Errorf("undefined event typd %d\n", event.eventHeader.TypeCode)
|
||||
}
|
||||
eventNum++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func printPayloadValues(colType schemapb.DataType, reader PayloadReaderInterface) error {
|
||||
fmt.Println("\tpayload values:")
|
||||
switch colType {
|
||||
case schemapb.DataType_BOOL:
|
||||
val, err := reader.GetBoolFromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %v\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_INT8:
|
||||
val, err := reader.GetInt8FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %d\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_INT16:
|
||||
val, err := reader.GetInt16FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %d\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_INT32:
|
||||
val, err := reader.GetInt32FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %d\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_INT64:
|
||||
val, err := reader.GetInt64FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %d\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_FLOAT:
|
||||
val, err := reader.GetFloatFromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %f\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_DOUBLE:
|
||||
val, err := reader.GetDoubleFromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
fmt.Printf("\t\t%d : %f\n", i, v)
|
||||
}
|
||||
case schemapb.DataType_STRING:
|
||||
rows, err := reader.GetPayloadLengthFromReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < rows; i++ {
|
||||
val, err := reader.GetOneStringFromPayload(i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("\t\t%d : %s\n", i, val)
|
||||
}
|
||||
case schemapb.DataType_VECTOR_BINARY:
|
||||
val, dim, err := reader.GetBinaryVectorFromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dim = dim / 8
|
||||
length := len(val) / dim
|
||||
for i := 0; i < length; i++ {
|
||||
fmt.Printf("\t\t%d :", i)
|
||||
for j := 0; j < dim; j++ {
|
||||
idx := i*dim + j
|
||||
fmt.Printf(" %02x", val[idx])
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
case schemapb.DataType_VECTOR_FLOAT:
|
||||
val, dim, err := reader.GetFloatVectorFromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
length := len(val) / dim
|
||||
for i := 0; i < length; i++ {
|
||||
fmt.Printf("\t\t%d :", i)
|
||||
for j := 0; j < dim; j++ {
|
||||
idx := i*dim + j
|
||||
fmt.Printf(" %f", val[idx])
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
default:
|
||||
return errors.Errorf("undefined data type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, reader PayloadReaderInterface) error {
|
||||
fmt.Println("\tpayload values:")
|
||||
switch colType {
|
||||
case schemapb.DataType_INT64:
|
||||
val, err := reader.GetInt64FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, v := range val {
|
||||
physical, logical := tsoutil.ParseTS(uint64(v))
|
||||
fmt.Printf("\t\t%d : physical : %v ; logical : %d\n", i, physical, logical)
|
||||
}
|
||||
case schemapb.DataType_STRING:
|
||||
rows, err := reader.GetPayloadLengthFromReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < rows; i++ {
|
||||
val, err := reader.GetOneStringFromPayload(i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch eventType {
|
||||
case CreateCollectionEventType:
|
||||
var req internalpb.CreateCollectionRequest
|
||||
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
msg := proto.MarshalTextString(&req)
|
||||
fmt.Printf("\t\t%d : create collection: %s\n", i, msg)
|
||||
case DropCollectionEventType:
|
||||
var req internalpb.DropPartitionRequest
|
||||
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
msg := proto.MarshalTextString(&req)
|
||||
fmt.Printf("\t\t%d : drop collection: %s\n", i, msg)
|
||||
case CreatePartitionEventType:
|
||||
var req internalpb.CreatePartitionRequest
|
||||
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
msg := proto.MarshalTextString(&req)
|
||||
fmt.Printf("\t\t%d : create partition: %s\n", i, msg)
|
||||
case DropPartitionEventType:
|
||||
var req internalpb.DropPartitionRequest
|
||||
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
msg := proto.MarshalTextString(&req)
|
||||
fmt.Printf("\t\t%d : drop partition: %s\n", i, msg)
|
||||
default:
|
||||
return errors.Errorf("undefined ddl event type %d", eventType)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return errors.Errorf("undefined data type")
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue