Add archiving file workflow for cache thirdparty

Signed-off-by: quicksilver <zhifeng.zhang@zilliz.com>
pull/4973/head^2
quicksilver 2020-12-08 19:53:22 +08:00 committed by yefu.chen
parent 8ace1c3837
commit d09ad77fb2
28 changed files with 54 additions and 1630 deletions

View File

@ -1,6 +1,7 @@
timeout(time: 10, unit: 'MINUTES') {
dir ("scripts") {
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache artfactory files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"thirdparty artfactory files not found!\"'
}
sh '. ./scripts/before-install.sh && make install'
@ -8,6 +9,7 @@ timeout(time: 10, unit: 'MINUTES') {
dir ("scripts") {
withCredentials([usernamePassword(credentialsId: "${env.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz -u ${USERNAME} -p ${PASSWORD}'
}
}
}

View File

@ -64,12 +64,12 @@ build-go:
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
build-cpp:
@(env bash $(PWD)/scripts/core_build.sh)
@(env bash $(PWD)/scripts/cwrapper_build.sh -t Release)
@(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -t Release -f "$(CUSTOM_THIRDPARTY_PATH)")
build-cpp-with-unittest:
@(env bash $(PWD)/scripts/core_build.sh -u)
@(env bash $(PWD)/scripts/cwrapper_build.sh -t Release)
@(env bash $(PWD)/scripts/core_build.sh -u -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -t Release -f "$(CUSTOM_THIRDPARTY_PATH)")
# Runs the tests.
unittest: test-cpp test-go

View File

@ -38,6 +38,8 @@ pipeline {
PULSAR_ADDRESS = "pulsar://127.0.0.1:6650"
ETCD_ADDRESS = "127.0.0.1:2379"
CCACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/ccache"
THIRDPARTY_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/thirdparty"
CUSTOM_THIRDPARTY_PATH = "${WORKSPACE}/3rdparty_download"
}
steps {
container('build-env') {

View File

@ -38,7 +38,7 @@ static auto map = [] {
MetricType
GetMetricType(const std::string& type_name) {
auto real_name = to_lower_copy(type_name);
AssertInfo(map.left.count(real_name), "metric type not found: (" + type_name + ")");
AssertInfo(map.left.count(real_name), "metric type not found: " + type_name);
return map.left.at(real_name);
}

View File

@ -13,8 +13,6 @@
#include "utils/Types.h"
#include <faiss/MetricType.h>
#include <string>
#include <boost/align/aligned_allocator.hpp>
#include <vector>
namespace milvus {
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
@ -26,15 +24,4 @@ using MetricType = faiss::MetricType;
faiss::MetricType
GetMetricType(const std::string& type);
// NOTE: dependent type
// used at meta-template programming
template <class...>
constexpr std::true_type always_true{};
template <class...>
constexpr std::false_type always_false{};
template <typename T>
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 512>>;
} // namespace milvus

View File

@ -70,6 +70,8 @@ to_lower(const std::string& raw) {
return data;
}
template <class...>
constexpr std::false_type always_false{};
template <typename T>
std::unique_ptr<Expr>
ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
@ -83,62 +85,31 @@ ParseRangeNodeImpl(const Schema& schema, const std::string& field_name, const Js
AssertInfo(RangeExpr::mapping_.count(op_name), "op(" + op_name + ") not found");
auto op = RangeExpr::mapping_.at(op_name);
if constexpr (std::is_same_v<T, bool>) {
Assert(item.value().is_boolean());
} else if constexpr (std::is_integral_v<T>) {
if constexpr (std::is_integral_v<T>) {
Assert(item.value().is_number_integer());
} else if constexpr (std::is_floating_point_v<T>) {
Assert(item.value().is_number());
} else {
static_assert(always_false<T>, "unsupported type");
__builtin_unreachable();
}
T value = item.value();
expr->conditions_.emplace_back(op, value);
}
std::sort(expr->conditions_.begin(), expr->conditions_.end());
return expr;
}
template <typename T>
std::unique_ptr<Expr>
ParseTermNodeImpl(const Schema& schema, const std::string& field_name, const Json& body) {
auto expr = std::make_unique<TermExprImpl<T>>();
auto data_type = schema[field_name].get_data_type();
Assert(body.is_array());
expr->field_id_ = field_name;
expr->data_type_ = data_type;
for (auto& value : body) {
if constexpr (std::is_same_v<T, bool>) {
Assert(value.is_boolean());
} else if constexpr (std::is_integral_v<T>) {
Assert(value.is_number_integer());
} else if constexpr (std::is_floating_point_v<T>) {
Assert(value.is_number());
} else {
static_assert(always_false<T>, "unsupported type");
__builtin_unreachable();
}
T real_value = value;
expr->terms_.push_back(real_value);
}
std::sort(expr->terms_.begin(), expr->terms_.end());
return expr;
}
std::unique_ptr<Expr>
ParseRangeNode(const Schema& schema, const Json& out_body) {
Assert(out_body.is_object());
Assert(out_body.size() == 1);
auto out_iter = out_body.begin();
auto field_name = out_iter.key();
auto body = out_iter.value();
auto data_type = schema[field_name].get_data_type();
Assert(!field_is_vector(data_type));
switch (data_type) {
case DataType::BOOL: {
return ParseRangeNodeImpl<bool>(schema, field_name, body);
PanicInfo("bool is not supported in Range node");
// return ParseRangeNodeImpl<bool>(schema, field_name, body);
}
case DataType::INT8:
return ParseRangeNodeImpl<int8_t>(schema, field_name, body);
@ -157,42 +128,6 @@ ParseRangeNode(const Schema& schema, const Json& out_body) {
}
}
static std::unique_ptr<Expr>
ParseTermNode(const Schema& schema, const Json& out_body) {
Assert(out_body.size() == 1);
auto out_iter = out_body.begin();
auto field_name = out_iter.key();
auto body = out_iter.value();
auto data_type = schema[field_name].get_data_type();
Assert(!field_is_vector(data_type));
switch (data_type) {
case DataType::BOOL: {
return ParseTermNodeImpl<bool>(schema, field_name, body);
}
case DataType::INT8: {
return ParseTermNodeImpl<int8_t>(schema, field_name, body);
}
case DataType::INT16: {
return ParseTermNodeImpl<int16_t>(schema, field_name, body);
}
case DataType::INT32: {
return ParseTermNodeImpl<int32_t>(schema, field_name, body);
}
case DataType::INT64: {
return ParseTermNodeImpl<int64_t>(schema, field_name, body);
}
case DataType::FLOAT: {
return ParseTermNodeImpl<float>(schema, field_name, body);
}
case DataType::DOUBLE: {
return ParseTermNodeImpl<double>(schema, field_name, body);
}
default: {
PanicInfo("unsupported data_type");
}
}
}
static std::unique_ptr<Plan>
CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
auto plan = std::make_unique<Plan>(schema);
@ -208,10 +143,6 @@ CreatePlanImplNaive(const Schema& schema, const std::string& dsl_str) {
if (pack.contains("vector")) {
auto& out_body = pack.at("vector");
plan->plan_node_ = ParseVecNode(plan.get(), out_body);
} else if (pack.contains("term")) {
AssertInfo(!predicate, "unsupported complex DSL");
auto& out_body = pack.at("term");
predicate = ParseTermNode(schema, out_body);
} else if (pack.contains("range")) {
AssertInfo(!predicate, "unsupported complex DSL");
auto& out_body = pack.at("range");

View File

@ -20,6 +20,7 @@
#include <map>
#include <string>
#include <vector>
#include <boost/align/aligned_allocator.hpp>
namespace milvus::query {
using Json = nlohmann::json;
@ -38,6 +39,9 @@ struct Plan {
// TODO: add move extra info
};
template <typename T>
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 512>>;
struct Placeholder {
// milvus::proto::service::PlaceholderGroup group_;
std::string tag_;

View File

@ -27,7 +27,7 @@ create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk
return nullptr;
}
auto& bitmaps = *bitmaps_opt.value();
auto src_vec = ~bitmaps.at(chunk_id);
auto& src_vec = bitmaps.at(chunk_id);
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
auto iter = reinterpret_cast<BitmapChunk::block_type*>(dst->mutable_data());

View File

@ -58,10 +58,6 @@ class ExecExprVisitor : ExprVisitor {
auto
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
template <typename T>
auto
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
private:
segcore::SegmentSmallIndex& segment_;
std::optional<RetType> ret_;

View File

@ -46,10 +46,6 @@ class ExecExprVisitor : ExprVisitor {
auto
ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType;
template <typename T>
auto
ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType;
private:
segcore::SegmentSmallIndex& segment_;
std::optional<RetType> ret_;
@ -67,6 +63,11 @@ ExecExprVisitor::visit(BoolBinaryExpr& expr) {
PanicInfo("unimplemented");
}
void
ExecExprVisitor::visit(TermExpr& expr) {
PanicInfo("unimplemented");
}
template <typename T, typename IndexFunc, typename ElementFunc>
auto
ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_func, ElementFunc element_func)
@ -83,17 +84,17 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_fu
auto& indexing_record = segment_.get_indexing_record();
const segcore::ScalarIndexingEntry<T>& entry = indexing_record.get_scalar_entry<T>(field_offset);
RetType results(vec.num_chunk());
RetType results(vec.chunk_size());
auto indexing_barrier = indexing_record.get_finished_ack();
for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) {
auto& result = results[chunk_id];
auto indexing = entry.get_indexing(chunk_id);
auto data = index_func(indexing);
result = std::move(*data);
result = ~std::move(*data);
Assert(result.size() == segcore::DefaultElementPerChunk);
}
for (auto chunk_id = indexing_barrier; chunk_id < vec.num_chunk(); ++chunk_id) {
for (auto chunk_id = indexing_barrier; chunk_id < vec.chunk_size(); ++chunk_id) {
auto& result = results[chunk_id];
result.resize(segcore::DefaultElementPerChunk);
auto chunk = vec.get_chunk(chunk_id);
@ -125,32 +126,32 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType {
switch (op) {
case OpType::Equal: {
auto index_func = [val](Index* index) { return index->In(1, &val); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x == val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x == val); });
}
case OpType::NotEqual: {
auto index_func = [val](Index* index) { return index->NotIn(1, &val); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x != val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x != val); });
}
case OpType::GreaterEqual: {
auto index_func = [val](Index* index) { return index->Range(val, Operator::GE); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x >= val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x >= val); });
}
case OpType::GreaterThan: {
auto index_func = [val](Index* index) { return index->Range(val, Operator::GT); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x > val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x > val); });
}
case OpType::LessEqual: {
auto index_func = [val](Index* index) { return index->Range(val, Operator::LE); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x <= val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x <= val); });
}
case OpType::LessThan: {
auto index_func = [val](Index* index) { return index->Range(val, Operator::LT); };
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return (x < val); });
return ExecRangeVisitorImpl(expr, index_func, [val](T x) { return !(x < val); });
}
default: {
PanicInfo("unsupported range node");
@ -166,16 +167,16 @@ ExecExprVisitor::ExecRangeVisitorDispatcher(RangeExpr& expr_raw) -> RetType {
if (false) {
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessThan)) {
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, false); };
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x < val2); });
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x < val2); });
} else if (ops == std::make_tuple(OpType::GreaterThan, OpType::LessEqual)) {
auto index_func = [val1, val2](Index* index) { return index->Range(val1, false, val2, true); };
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 < x && x <= val2); });
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 < x && x <= val2); });
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessThan)) {
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, false); };
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x < val2); });
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x < val2); });
} else if (ops == std::make_tuple(OpType::GreaterEqual, OpType::LessEqual)) {
auto index_func = [val1, val2](Index* index) { return index->Range(val1, true, val2, true); };
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return (val1 <= x && x <= val2); });
return ExecRangeVisitorImpl(expr, index_func, [val1, val2](T x) { return !(val1 <= x && x <= val2); });
} else {
PanicInfo("unsupported range node");
}
@ -225,79 +226,4 @@ ExecExprVisitor::visit(RangeExpr& expr) {
ret_ = std::move(ret);
}
template <typename T>
auto
ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType {
auto& expr = static_cast<TermExprImpl<T>&>(expr_raw);
auto& records = segment_.get_insert_record();
auto data_type = expr.data_type_;
auto& schema = segment_.get_schema();
auto field_offset_opt = schema.get_offset(expr.field_id_);
Assert(field_offset_opt);
auto field_offset = field_offset_opt.value();
auto& field_meta = schema[field_offset];
auto vec_ptr = records.get_entity<T>(field_offset);
auto& vec = *vec_ptr;
auto num_chunk = vec.num_chunk();
RetType bitsets;
auto N = records.ack_responder_.GetAck();
// small batch
for (int64_t chunk_id = 0; chunk_id < num_chunk; ++chunk_id) {
auto& chunk = vec.get_chunk(chunk_id);
auto size = chunk_id == num_chunk - 1 ? N - chunk_id * segcore::DefaultElementPerChunk
: segcore::DefaultElementPerChunk;
boost::dynamic_bitset<> bitset(segcore::DefaultElementPerChunk);
for (int i = 0; i < size; ++i) {
auto value = chunk[i];
bool is_in = std::binary_search(expr.terms_.begin(), expr.terms_.end(), value);
bitset[i] = is_in;
}
bitsets.emplace_back(std::move(bitset));
}
return bitsets;
}
void
ExecExprVisitor::visit(TermExpr& expr) {
auto& field_meta = segment_.get_schema()[expr.field_id_];
Assert(expr.data_type_ == field_meta.get_data_type());
RetType ret;
switch (expr.data_type_) {
case DataType::BOOL: {
ret = ExecTermVisitorImpl<bool>(expr);
break;
}
case DataType::INT8: {
ret = ExecTermVisitorImpl<int8_t>(expr);
break;
}
case DataType::INT16: {
ret = ExecTermVisitorImpl<int16_t>(expr);
break;
}
case DataType::INT32: {
ret = ExecTermVisitorImpl<int32_t>(expr);
break;
}
case DataType::INT64: {
ret = ExecTermVisitorImpl<int64_t>(expr);
break;
}
case DataType::FLOAT: {
ret = ExecTermVisitorImpl<float>(expr);
break;
}
case DataType::DOUBLE: {
ret = ExecTermVisitorImpl<double>(expr);
break;
}
default:
PanicInfo("unsupported");
}
ret_ = std::move(ret);
}
} // namespace milvus::query

View File

@ -196,7 +196,7 @@ class ConcurrentVectorImpl : public VectorBase {
}
ssize_t
num_chunk() const {
chunk_size() const {
return chunks_.size();
}

View File

@ -24,7 +24,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector
auto source = dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
Assert(source);
auto chunk_size = source->num_chunk();
auto chunk_size = source->chunk_size();
assert(ack_end <= chunk_size);
auto conf = get_build_conf();
data_.grow_to_at_least(ack_end);
@ -87,7 +87,7 @@ void
ScalarIndexingEntry<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {
auto source = dynamic_cast<const ConcurrentVector<T>*>(vec_base);
Assert(source);
auto chunk_size = source->num_chunk();
auto chunk_size = source->chunk_size();
assert(ack_end <= chunk_size);
data_.grow_to_at_least(ack_end);
for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) {

View File

@ -467,16 +467,16 @@ SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
auto dim = field.get_dim();
auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode);
auto chunk_size = record_.uids_.num_chunk();
auto chunk_size = record_.uids_.chunk_size();
auto& uids = record_.uids_;
auto entities = record_.get_entity<FloatVector>(offset);
std::vector<knowhere::DatasetPtr> datasets;
for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) {
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
auto entities_chunk = entities->get_chunk(chunk_id).data();
int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
}
for (auto& ds : datasets) {

View File

@ -241,10 +241,10 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) {
auto entities = record_.get_entity<FloatVector>(offset);
std::vector<knowhere::DatasetPtr> datasets;
for (int chunk_id = 0; chunk_id < uids.num_chunk(); ++chunk_id) {
for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) {
auto entities_chunk = entities->get_chunk(chunk_id).data();
int64_t count = chunk_id == uids.num_chunk() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk
: DefaultElementPerChunk;
datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk));
}
for (auto& ds : datasets) {

View File

@ -26,5 +26,4 @@ target_link_libraries(all_tests
pthread
milvus_utils
)
install (TARGETS all_tests DESTINATION unittest)

View File

@ -1,12 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>

View File

@ -52,7 +52,7 @@ TEST(ConcurrentVector, TestSingle) {
c_vec.set_data(total_count, vec.data(), insert_size);
total_count += insert_size;
}
ASSERT_EQ(c_vec.num_chunk(), (total_count + 31) / 32);
ASSERT_EQ(c_vec.chunk_size(), (total_count + 31) / 32);
for (int i = 0; i < total_count; ++i) {
for (int d = 0; d < dim; ++d) {
auto std_data = d + i * dim;

View File

@ -321,88 +321,7 @@ TEST(Expr, TestRange) {
auto ans = final[vec_id][offset];
auto val = age_col[i];
auto ref = ref_func(val);
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
}
}
}
TEST(Expr, TestTerm) {
using namespace milvus::query;
using namespace milvus::segcore;
auto vec_2k_3k = [] {
std::string buf = "[";
for (int i = 2000; i < 3000 - 1; ++i) {
buf += std::to_string(i) + ", ";
}
buf += std::to_string(2999) + "]";
return buf;
}();
std::vector<std::tuple<std::string, std::function<bool(int)>>> testcases = {
{R"([2000, 3000])", [](int v) { return v == 2000 || v == 3000; }},
{R"([2000])", [](int v) { return v == 2000; }},
{R"([3000])", [](int v) { return v == 3000; }},
{vec_2k_3k, [](int v) { return 2000 <= v && v < 3000; }},
};
std::string dsl_string_tmp = R"(
{
"bool": {
"must": [
{
"term": {
"age": @@@@
}
},
{
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 10
},
"query": "$0",
"topk": 10
}
}
}
]
}
})";
auto schema = std::make_shared<Schema>();
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2);
schema->AddField("age", DataType::INT32);
auto seg = CreateSegment(schema);
int N = 10000;
std::vector<int> age_col;
int num_iters = 100;
for (int iter = 0; iter < num_iters; ++iter) {
auto raw_data = DataGen(schema, N, iter);
auto new_age_col = raw_data.get_col<int>(1);
age_col.insert(age_col.end(), new_age_col.begin(), new_age_col.end());
seg->PreInsert(N);
seg->Insert(iter * N, N, raw_data.row_ids_.data(), raw_data.timestamps_.data(), raw_data.raw_);
}
auto seg_promote = dynamic_cast<SegmentSmallIndex*>(seg.get());
ExecExprVisitor visitor(*seg_promote);
for (auto [clause, ref_func] : testcases) {
auto loc = dsl_string_tmp.find("@@@@");
auto dsl_string = dsl_string_tmp;
dsl_string.replace(loc, 4, clause);
auto plan = CreatePlan(*schema, dsl_string);
auto final = visitor.call_child(*plan->plan_node_->predicate_.value());
EXPECT_EQ(final.size(), upper_div(N * num_iters, DefaultElementPerChunk));
for (int i = 0; i < N * num_iters; ++i) {
auto vec_id = i / DefaultElementPerChunk;
auto offset = i % DefaultElementPerChunk;
auto ans = final[vec_id][offset];
auto val = age_col[i];
auto ref = ref_func(val);
auto ref = !ref_func(val);
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
}
}

View File

@ -31,14 +31,6 @@ struct GeneratedData {
memcpy(ret.data(), target.data(), target.size());
return ret;
}
template <typename T>
auto
get_mutable_col(int index) {
auto& target = cols_.at(index);
assert(target.size() == row_ids_.size() * sizeof(T));
auto ptr = reinterpret_cast<T*>(target.data());
return ptr;
}
private:
GeneratedData() = default;
@ -66,9 +58,6 @@ GeneratedData::generate_rows(int N, SchemaPtr schema) {
}
}
rows_ = std::move(result);
raw_.raw_data = rows_.data();
raw_.sizeof_per_row = schema->get_total_sizeof();
raw_.count = N;
}
inline GeneratedData
@ -140,12 +129,14 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) {
}
GeneratedData res;
res.cols_ = std::move(cols);
res.generate_rows(N, schema);
for (int i = 0; i < N; ++i) {
res.row_ids_.push_back(i);
res.timestamps_.push_back(i);
}
res.generate_rows(N, schema);
res.raw_.raw_data = res.rows_.data();
res.raw_.sizeof_per_row = schema->get_total_sizeof();
res.raw_.count = N;
return std::move(res);
}

View File

@ -1,96 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
type BinlogReader struct {
magicNumber int32
descriptorEvent
currentEventReader *EventReader
buffer *bytes.Buffer
currentOffset int32
isClose bool
}
func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
if reader.isClose {
return nil, errors.New("bin log reader is closed")
}
if reader.currentEventReader != nil {
reader.currentOffset = reader.currentEventReader.nextPosition
if err := reader.currentEventReader.Close(); err != nil {
return nil, err
}
reader.currentEventReader = nil
if reader.currentOffset >= int32(reader.buffer.Cap()) {
return nil, nil
}
// skip remaining bytes of this event
reader.buffer.Next(int(reader.currentOffset) - reader.buffer.Len())
}
if reader.currentOffset >= int32(reader.buffer.Cap()) {
return nil, nil
}
eventReader, err := newEventReader(reader.descriptorEvent.payloadDataType, reader.buffer)
if err != nil {
return nil, err
}
reader.currentEventReader = eventReader
return reader.currentEventReader, nil
}
func (reader *BinlogReader) readMagicNumber() (int32, error) {
if err := binary.Read(reader.buffer, binary.LittleEndian, reader.magicNumber); err != nil {
return -1, err
}
reader.currentOffset = 4
if reader.magicNumber != MagicNumber {
return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(MagicNumber) +
", actual: " + strconv.Itoa(int(reader.magicNumber)))
}
return reader.magicNumber, nil
}
func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) {
event, err := ReadDescriptorEvent(reader.buffer)
reader.currentOffset = reader.descriptorEvent.nextPosition
if err != nil {
return nil, err
}
reader.descriptorEvent = *event
return &reader.descriptorEvent, nil
}
func (reader *BinlogReader) Close() error {
if reader.isClose {
return nil
}
reader.isClose = true
if reader.currentEventReader != nil {
if err := reader.currentEventReader.Close(); err != nil {
return err
}
}
return nil
}
func NewBinlogReader(data []byte) (*BinlogReader, error) {
reader := &BinlogReader{
buffer: bytes.NewBuffer(data),
isClose: false,
}
if _, err := reader.readMagicNumber(); err != nil {
return nil, err
}
if _, err := reader.readDescriptorEvent(); err != nil {
return nil, err
}
return reader, nil
}

View File

@ -1,299 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
const (
// todo : put to param table
ServerID = 1
BinlogVersion = 1
CommitID = 1
ServerVersion = 1
HeaderLength = 17
)
type BinlogType int32
const (
InsertBinlog BinlogType = iota
DeleteBinlog
DDLBinlog
)
const (
MagicNumber = 0xfffabc
)
type baseBinlogWriter struct {
descriptorEvent
magicNumber int32
binlogType BinlogType
eventWriters []EventWriter
currentEventWriter EventWriter
buffer *bytes.Buffer
numEvents int32
numRows int32
isClose bool
offset int32
}
func (writer *baseBinlogWriter) checkClose() error {
if writer.isClose {
return errors.New("insert binlog writer is already closed")
}
return nil
}
func (writer *baseBinlogWriter) appendEventWriter() error {
if writer.currentEventWriter != nil {
if err := writer.currentEventWriter.Finish(); err != nil {
return err
}
writer.eventWriters = append(writer.eventWriters, writer.currentEventWriter)
length, err := writer.currentEventWriter.GetMemoryUsageInBytes()
if err != nil {
return err
}
writer.offset += length
writer.numEvents++
nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter()
if err != nil {
return err
}
writer.numRows += int32(nums)
writer.currentEventWriter = nil
}
return nil
}
func (writer *baseBinlogWriter) GetEventNums() int32 {
return writer.numEvents
}
func (writer *baseBinlogWriter) GetRowNums() (int32, error) {
var res = writer.numRows
if writer.currentEventWriter != nil {
nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter()
if err != nil {
}
res += int32(nums)
}
return res, nil
}
func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
return writer.binlogType
}
// GetBuffer get binlog buffer. Return nil if binlog is not finished yet.
func (writer *baseBinlogWriter) GetBuffer() []byte {
if writer.buffer != nil {
return writer.buffer.Bytes()
}
return nil
}
// Close allocate buffer and release resource
func (writer *baseBinlogWriter) Close() error {
if writer.isClose {
return nil
}
writer.isClose = true
if err := writer.appendEventWriter(); err != nil {
return err
}
writer.buffer = new(bytes.Buffer)
if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil {
return err
}
if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
return err
}
for _, w := range writer.eventWriters {
if err := w.Write(writer.buffer); err != nil {
return err
}
}
// close all writers
for _, e := range writer.eventWriters {
if err := e.Close(); err != nil {
return err
}
}
return nil
}
type InsertBinlogWriter struct {
baseBinlogWriter
}
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newInsertEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
type DeleteBinlogWriter struct {
baseBinlogWriter
}
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newDeleteEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
type DDLBinlogWriter struct {
baseBinlogWriter
}
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newCreateCollectionEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newDropCollectionEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newCreatePartitionEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
if err := writer.checkClose(); err != nil {
return nil, err
}
if err := writer.appendEventWriter(); err != nil {
return nil, err
}
event, err := newDropPartitionEventWriter(writer.payloadDataType, writer.offset)
if err != nil {
return nil, err
}
writer.currentEventWriter = event
return event, nil
}
func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.payloadDataType = dataType
return &InsertBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: descriptorEvent,
magicNumber: MagicNumber,
binlogType: InsertBinlog,
eventWriters: make([]EventWriter, 0),
currentEventWriter: nil,
buffer: nil,
numEvents: 0,
numRows: 0,
isClose: false,
offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(),
},
}
}
func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.payloadDataType = dataType
return &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: descriptorEvent,
magicNumber: MagicNumber,
binlogType: DeleteBinlog,
eventWriters: make([]EventWriter, 0),
currentEventWriter: nil,
buffer: nil,
numEvents: 0,
numRows: 0,
isClose: false,
offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(),
},
}
}
func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.payloadDataType = dataType
return &DDLBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: descriptorEvent,
magicNumber: MagicNumber,
binlogType: DDLBinlog,
eventWriters: make([]EventWriter, 0),
currentEventWriter: nil,
buffer: nil,
numEvents: 0,
numRows: 0,
isClose: false,
offset: 4 + descriptorEvent.descriptorEventData.GetMemoryUsageInBytes(),
},
}
}

View File

@ -1,29 +0,0 @@
package storage
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
func TestBinlogWriter(t *testing.T) {
binlogWriter := NewInsertBinlogWriter(schemapb.DataType_INT32)
defer binlogWriter.Close()
eventWriter, err := binlogWriter.NextInsertEventWriter()
assert.Nil(t, err)
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3})
assert.Nil(t, err)
assert.Nil(t, nil, binlogWriter.GetBuffer())
err = binlogWriter.Close()
assert.Nil(t, err)
assert.EqualValues(t, 1, binlogWriter.GetEventNums())
nums, err := binlogWriter.GetRowNums()
assert.Nil(t, err)
assert.EqualValues(t, 3, nums)
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3})
assert.NotNil(t, err)
nums, err = binlogWriter.GetRowNums()
assert.Nil(t, err)
assert.EqualValues(t, 3, nums)
}

View File

@ -1,347 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"io"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type descriptorEventData struct {
descriptorEventDataFixPart
postHeaderLengths []uint8
}
type descriptorEventDataFixPart struct {
binlogVersion int16
serverVersion int64
commitID int64
headerLength int8
collectionID int64
partitionID int64
segmentID int64
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
payloadDataType schemapb.DataType
}
func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) {
data.startTimestamp = ts
}
func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) {
data.endTimestamp = ts
}
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
buf := new(bytes.Buffer)
_ = data.Write(buf)
return int32(buf.Len())
}
func (data *descriptorEventData) Write(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.descriptorEventDataFixPart); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.postHeaderLengths); err != nil {
return err
}
return nil
}
func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
event := newDescriptorEventData()
if err := binary.Read(buffer, binary.LittleEndian, &event.descriptorEventDataFixPart); err != nil {
return nil, err
}
if err := binary.Read(buffer, binary.LittleEndian, &event.postHeaderLengths); err != nil {
return nil, err
}
return &event, nil
}
type eventData interface {
GetEventDataSize() int32
WriteEventData(buffer io.Writer) error
}
// all event types' fixed part only have start timestamp and end timestamp yet, but maybe different events will
// have different fields later, so we just create a event data struct per event type.
type insertEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *insertEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *insertEventData) WriteEventData(buffer io.Writer) error {
return binary.Write(buffer, binary.LittleEndian, data)
}
type deleteEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *deleteEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *deleteEventData) WriteEventData(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil {
return err
}
return nil
}
type createCollectionEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *createCollectionEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil {
return err
}
return nil
}
type dropCollectionEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *dropCollectionEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil {
return err
}
return nil
}
type createPartitionEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *createPartitionEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil {
return err
}
return nil
}
type dropPartitionEventData struct {
startTimestamp typeutil.Timestamp
endTimestamp typeutil.Timestamp
}
func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
data.startTimestamp = timestamp
}
func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.endTimestamp = timestamp
}
func (data *dropPartitionEventData) GetEventDataSize() int32 {
buf := new(bytes.Buffer)
_ = binary.Write(buf, binary.LittleEndian, data)
return int32(buf.Len())
}
func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.startTimestamp); err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data.endTimestamp); err != nil {
return err
}
return nil
}
func newDescriptorEventData() descriptorEventData {
data := descriptorEventData{
descriptorEventDataFixPart: descriptorEventDataFixPart{
binlogVersion: BinlogVersion,
serverVersion: ServerVersion,
commitID: CommitID,
collectionID: -1,
partitionID: -1,
segmentID: -1,
startTimestamp: 0,
endTimestamp: 0,
payloadDataType: -1,
},
postHeaderLengths: []uint8{16, 16, 16, 16, 16, 16},
}
data.headerLength = int8(data.GetMemoryUsageInBytes())
return data
}
func newInsertEventData() insertEventData {
return insertEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func newDeleteEventData() deleteEventData {
return deleteEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func newCreateCollectionEventData() createCollectionEventData {
return createCollectionEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func newDropCollectionEventData() dropCollectionEventData {
return dropCollectionEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func newCreatePartitionEventData() createPartitionEventData {
return createPartitionEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func newDropPartitionEventData() dropPartitionEventData {
return dropPartitionEventData{
startTimestamp: 0,
endTimestamp: 0,
}
}
func readInsertEventData(buffer io.Reader) (*insertEventData, error) {
data := &insertEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) {
data := &deleteEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData, error) {
data := &createCollectionEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, error) {
data := &dropCollectionEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, error) {
data := &createPartitionEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
func readDropPartitionEventData(buffer io.Reader) (*dropPartitionEventData, error) {
data := &dropPartitionEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}

View File

@ -1,80 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"io"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type baseEventHeader struct {
timestamp typeutil.Timestamp
typeCode EventTypeCode
serverID int32
eventLength int32
nextPosition int32
}
func (header *baseEventHeader) GetMemoryUsageInBytes() int32 {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, header)
return int32(buf.Len())
}
func (header *baseEventHeader) TypeCode() EventTypeCode {
return header.typeCode
}
func (header *baseEventHeader) Write(buffer io.Writer) error {
return binary.Write(buffer, binary.LittleEndian, header)
}
type descriptorEventHeader = baseEventHeader
type eventHeader struct {
baseEventHeader
}
func readEventHeader(buffer io.Reader) (*eventHeader, error) {
header := &eventHeader{}
if err := binary.Read(buffer, binary.LittleEndian, header); err != nil {
return nil, err
}
return header, nil
}
func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error) {
header := &descriptorEventHeader{}
if err := binary.Read(buffer, binary.LittleEndian, header); err != nil {
return nil, err
}
return header, nil
}
func newDescriptorEventHeader() descriptorEventHeader {
header := descriptorEventHeader{
timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
typeCode: DescriptorEventType,
serverID: ServerID,
}
header.eventLength = header.GetMemoryUsageInBytes()
header.nextPosition = header.eventLength + 4
return header
}
func newEventHeader(eventTypeCode EventTypeCode) eventHeader {
return eventHeader{
baseEventHeader: baseEventHeader{
timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
typeCode: eventTypeCode,
serverID: ServerID,
eventLength: -1,
nextPosition: -1,
},
}
}

View File

@ -1,99 +0,0 @@
package storage
import (
"bytes"
"errors"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type EventReader struct {
eventHeader
eventData
PayloadReaderInterface
buffer *bytes.Buffer
isClosed bool
}
func (reader *EventReader) checkClose() error {
if reader.isClosed {
return errors.New("event reader is closed")
}
return nil
}
func (reader *EventReader) readHeader() (*eventHeader, error) {
if err := reader.checkClose(); err != nil {
return nil, err
}
header, err := readEventHeader(reader.buffer)
if err != nil {
return nil, err
}
reader.eventHeader = *header
return &reader.eventHeader, nil
}
func (reader *EventReader) readData() (eventData, error) {
if err := reader.checkClose(); err != nil {
return nil, err
}
var data eventData
var err error
switch reader.TypeCode() {
case InsertEventType:
data, err = readInsertEventData(reader.buffer)
case DeleteEventType:
data, err = readDeleteEventData(reader.buffer)
case CreateCollectionEventType:
data, err = readCreateCollectionEventData(reader.buffer)
case DropCollectionEventType:
data, err = readDropCollectionEventData(reader.buffer)
case CreatePartitionEventType:
data, err = readCreatePartitionEventData(reader.buffer)
case DropPartitionEventType:
data, err = readDropPartitionEventData(reader.buffer)
default:
return nil, errors.New("unknown header type code: " + strconv.Itoa(int(reader.TypeCode())))
}
if err != nil {
return nil, err
}
reader.eventData = data
return reader.eventData, nil
}
func (reader *EventReader) Close() error {
if !reader.isClosed {
reader.isClosed = true
return reader.PayloadReaderInterface.Close()
}
return nil
}
func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) {
reader := &EventReader{
eventHeader: eventHeader{
baseEventHeader{},
},
buffer: buffer,
isClosed: false,
}
if _, err := reader.readHeader(); err != nil {
return nil, err
}
if _, err := reader.readData(); err != nil {
return nil, err
}
payloadReader, err := NewPayloadReader(datatype, buffer.Bytes())
if err != nil {
return nil, err
}
reader.PayloadReaderInterface = payloadReader
return reader, nil
}

View File

@ -1,289 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"io"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type EventTypeCode int8
const (
DescriptorEventType EventTypeCode = iota
InsertEventType
DeleteEventType
CreateCollectionEventType
DropCollectionEventType
CreatePartitionEventType
DropPartitionEventType
)
func (code EventTypeCode) String() string {
codes := []string{"DescriptorEventType", "InsertEventType", "DeleteEventType", "CreateCollectionEventType", "DropCollectionEventType",
"CreatePartitionEventType", "DropPartitionEventType"}
if len(codes) < int(code) {
return ""
}
return codes[code]
}
type descriptorEvent struct {
descriptorEventHeader
descriptorEventData
}
func (event *descriptorEvent) Write(buffer io.Writer) error {
if err := event.descriptorEventHeader.Write(buffer); err != nil {
return err
}
if err := event.descriptorEventData.Write(buffer); err != nil {
return err
}
return nil
}
func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error) {
header, err := readDescriptorEventHeader(buffer)
if err != nil {
return nil, err
}
data, err := readDescriptorEventData(buffer)
if err != nil {
return nil, err
}
return &descriptorEvent{
descriptorEventHeader: *header,
descriptorEventData: *data,
}, nil
}
type EventWriter interface {
PayloadWriterInterface
// Finish set meta in header and no data can be added to event writer
Finish() error
// Close release resources
Close() error
// Write serialize to buffer, should call Finish first
Write(buffer *bytes.Buffer) error
GetMemoryUsageInBytes() (int32, error)
}
type baseEventWriter struct {
eventHeader
PayloadWriterInterface
isClosed bool
isFinish bool
offset int32
getEventDataSize func() int32
writeEventData func(buffer io.Writer) error
}
func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) {
data, err := writer.GetPayloadBufferFromWriter()
if err != nil {
return -1, err
}
return writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() +
int32(len(data)), nil
}
func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error {
if err := writer.eventHeader.Write(buffer); err != nil {
return err
}
if err := writer.writeEventData(buffer); err != nil {
return err
}
data, err := writer.GetPayloadBufferFromWriter()
if err != nil {
return err
}
if err := binary.Write(buffer, binary.LittleEndian, data); err != nil {
return err
}
return nil
}
func (writer *baseEventWriter) Finish() error {
if !writer.isFinish {
writer.isFinish = true
if err := writer.FinishPayloadWriter(); err != nil {
return err
}
eventLength, err := writer.GetMemoryUsageInBytes()
if err != nil {
return err
}
writer.eventLength = eventLength
writer.nextPosition = eventLength + writer.offset
}
return nil
}
func (writer *baseEventWriter) Close() error {
if !writer.isClosed {
writer.isFinish = true
writer.isClosed = true
if err := writer.ReleasePayloadWriter(); err != nil {
return err
}
}
return nil
}
type insertEventWriter struct {
baseEventWriter
insertEventData
}
type deleteEventWriter struct {
baseEventWriter
deleteEventData
}
type createCollectionEventWriter struct {
baseEventWriter
createCollectionEventData
}
type dropCollectionEventWriter struct {
baseEventWriter
dropCollectionEventData
}
type createPartitionEventWriter struct {
baseEventWriter
createPartitionEventData
}
type dropPartitionEventWriter struct {
baseEventWriter
dropPartitionEventData
}
func newDescriptorEvent() descriptorEvent {
return descriptorEvent{
descriptorEventHeader: newDescriptorEventHeader(),
descriptorEventData: newDescriptorEventData(),
}
}
func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &insertEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(InsertEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
insertEventData: newInsertEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
return writer, nil
}
func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &deleteEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(DeleteEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
deleteEventData: newDeleteEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
return writer, nil
}
func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*createCollectionEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &createCollectionEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(CreateCollectionEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
createCollectionEventData: newCreateCollectionEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
return writer, nil
}
func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dropCollectionEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &dropCollectionEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(DropCollectionEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
dropCollectionEventData: newDropCollectionEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
return writer, nil
}
func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*createPartitionEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &createPartitionEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(CreatePartitionEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
createPartitionEventData: newCreatePartitionEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
return writer, nil
}
func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dropPartitionEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
writer := &dropPartitionEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: newEventHeader(DropPartitionEventType),
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
offset: offset,
},
dropPartitionEventData: newDropPartitionEventData(),
}
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataSize
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
return writer, nil
}

View File

@ -1,44 +0,0 @@
package storage
import (
"bytes"
"testing"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
func TestEventWriter(t *testing.T) {
insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0)
assert.Nil(t, err)
defer insertEvent.Close()
err = insertEvent.Close()
assert.Nil(t, err)
insertEvent, err = newInsertEventWriter(schemapb.DataType_INT32, 0)
assert.Nil(t, err)
err = insertEvent.AddInt64ToPayload([]int64{1, 1})
assert.NotNil(t, err)
err = insertEvent.AddInt32ToPayload([]int32{1, 2, 3})
assert.Nil(t, err)
nums, err := insertEvent.GetPayloadLengthFromWriter()
assert.Nil(t, err)
assert.EqualValues(t, 3, nums)
err = insertEvent.Finish()
assert.Nil(t, err)
length, err := insertEvent.GetMemoryUsageInBytes()
assert.Nil(t, err)
assert.EqualValues(t, length, insertEvent.eventLength)
err = insertEvent.AddInt32ToPayload([]int32{1})
assert.NotNil(t, err)
buffer := new(bytes.Buffer)
err = insertEvent.Write(buffer)
assert.Nil(t, err)
length, err = insertEvent.GetMemoryUsageInBytes()
assert.Nil(t, err)
assert.EqualValues(t, length, buffer.Len())
err = insertEvent.Close()
assert.Nil(t, err)
}

View File

@ -9,8 +9,6 @@ package storage
*/
import "C"
import (
"fmt"
"strconv"
"unsafe"
"github.com/zilliztech/milvus-distributed/internal/errors"
@ -18,41 +16,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type PayloadWriterInterface interface {
AddDataToPayload(msgs interface{}, dim ...int) error
AddBoolToPayload(msgs []bool) error
AddInt8ToPayload(msgs []int8) error
AddInt16ToPayload(msgs []int16) error
AddInt32ToPayload(msgs []int32) error
AddInt64ToPayload(msgs []int64) error
AddFloatToPayload(msgs []float32) error
AddDoubleToPayload(msgs []float64) error
AddOneStringToPayload(msgs string) error
AddBinaryVectorToPayload(binVec []byte, dim int) error
AddFloatVectorToPayload(binVec []float32, dim int) error
FinishPayloadWriter() error
GetPayloadBufferFromWriter() ([]byte, error)
GetPayloadLengthFromWriter() (int, error)
ReleasePayloadWriter() error
Close() error
}
type PayloadReaderInterface interface {
GetDataFromPayload(idx ...int) (interface{}, int, error)
GetBoolFromPayload() ([]bool, error)
GetInt8FromPayload() ([]int8, error)
GetInt16FromPayload() ([]int16, error)
GetInt32FromPayload() ([]int32, error)
GetInt64FromPayload() ([]int64, error)
GetFloatFromPayload() ([]float32, error)
GetDoubleFromPayload() ([]float64, error)
GetOneStringFromPayload(idx int) (string, error)
GetBinaryVectorFromPayload() ([]byte, int, error)
GetFloatVectorFromPayload() ([]float32, int, error)
GetPayloadLengthFromReader() (int, error)
ReleasePayloadReader() error
Close() error
}
type (
PayloadWriter struct {
payloadWriterPtr C.CPayloadWriter
@ -382,7 +345,6 @@ func (w *PayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
cb := C.GetPayloadBufferFromWriter(w.payloadWriterPtr)
pointer := unsafe.Pointer(cb.data)
length := int(cb.length)
fmt.Print("payload length: " + strconv.Itoa(length))
if length <= 0 {
return nil, errors.New("empty buffer")
}