mirror of https://github.com/milvus-io/milvus.git
fix the bugs of delete all and compact (#3395)
* fix the bugs of delete all and compact Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * fix the wrong usages in unittest Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * fix the bug of insert makes no effect Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * add character check in ExtraFileInfo and change the const size and type Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * format code Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * fix wrong test case Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>pull/3416/head
parent
21407a5ce0
commit
2e5ff884c1
|
@ -757,6 +757,15 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
|
|||
auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
|
||||
auto row_count = segment_commit->GetRowCount();
|
||||
if (row_count == 0) {
|
||||
snapshot::OperationContext drop_seg_context;
|
||||
auto seg = latest_ss->GetResource<snapshot::Segment>(segment_id);
|
||||
drop_seg_context.prev_segment = seg;
|
||||
auto drop_op = std::make_shared<snapshot::DropSegmentOperation>(drop_seg_context, latest_ss);
|
||||
status = drop_op->Push();
|
||||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
|
||||
<< status.message();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -566,6 +566,33 @@ NewSegmentOperation::CommitRowCount(SIZE_TYPE row_cnt) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
DropSegmentOperation::DropSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
|
||||
: BaseT(context, prev_ss) {
|
||||
}
|
||||
|
||||
Status
|
||||
DropSegmentOperation::DoExecute(StorePtr store) {
|
||||
OperationContext pc_context;
|
||||
// create a empty segment commit
|
||||
pc_context.stale_segments.push_back(context_.prev_segment);
|
||||
PartitionCommitOperation pc_op(pc_context, GetAdjustedSS());
|
||||
STATUS_CHECK(pc_op(store));
|
||||
STATUS_CHECK(pc_op.GetResource(pc_context.new_partition_commit));
|
||||
auto pc_ctx_p = ResourceContextBuilder<PartitionCommit>().SetOp(meta::oUpdate).CreatePtr();
|
||||
AddStepWithLsn(*pc_context.new_partition_commit, context_.lsn, pc_ctx_p);
|
||||
|
||||
auto cc_context = OperationContext();
|
||||
cc_context.new_partition_commits.push_back(pc_context.new_partition_commit);
|
||||
|
||||
CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
|
||||
STATUS_CHECK(cc_op(store));
|
||||
STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
|
||||
auto cc_ctx_p = ResourceContextBuilder<CollectionCommit>().SetOp(meta::oUpdate).CreatePtr();
|
||||
AddStepWithLsn(*context_.new_collection_commit, context_.lsn, cc_ctx_p);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
NewSegmentOperation::DoExecute(StorePtr store) {
|
||||
// PXU TODO:
|
||||
|
|
|
@ -177,6 +177,19 @@ class NewSegmentOperation : public CompoundBaseOperation<NewSegmentOperation> {
|
|||
SIZE_TYPE row_cnt_ = 0;
|
||||
};
|
||||
|
||||
class DropSegmentOperation : public CompoundBaseOperation<DropSegmentOperation> {
|
||||
public:
|
||||
using BaseT = CompoundBaseOperation<DropSegmentOperation>;
|
||||
static constexpr const char* Name = "DS";
|
||||
|
||||
DropSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss);
|
||||
|
||||
Status DoExecute(StorePtr) override;
|
||||
|
||||
// Status
|
||||
// AddStaleSegment();
|
||||
};
|
||||
|
||||
class MergeOperation : public CompoundBaseOperation<MergeOperation> {
|
||||
public:
|
||||
using BaseT = CompoundBaseOperation<MergeOperation>;
|
||||
|
|
|
@ -96,6 +96,8 @@ PartitionCommitOperation::GetPrevResource() const {
|
|||
return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commit->GetPartitionId());
|
||||
} else if (context_.new_segment_commits.size() > 0) {
|
||||
return GetStartedSS()->GetPartitionCommitByPartitionId(context_.new_segment_commits[0]->GetPartitionId());
|
||||
} else if (!context_.stale_segments.empty()) {
|
||||
return GetStartedSS()->GetPartitionCommitByPartitionId(context_.stale_segments[0]->GetPartitionId());
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
|
|
@ -1285,6 +1285,16 @@ GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc:
|
|||
CHECK_NULLPTR_RETURN(request);
|
||||
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
|
||||
|
||||
engine::IDNumbers vector_ids;
|
||||
vector_ids.reserve(request->entity_id_array_size());
|
||||
for (int i = 0; i < request->entity_id_array_size(); i++) {
|
||||
if (request->entity_id_array(i) < 0) {
|
||||
auto status = Status{SERVER_INVALID_ROWRECORD_ARRAY, "id can not be negative number"};
|
||||
SET_RESPONSE(response->mutable_status(), status, context);
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
}
|
||||
|
||||
auto field_size = request->fields_size();
|
||||
|
||||
std::unordered_map<std::string, std::vector<uint8_t>> chunk_data;
|
||||
|
|
|
@ -18,10 +18,15 @@
|
|||
#include "storage/ExtraFileInfo.h"
|
||||
|
||||
const char* MAGIC = "Milvus";
|
||||
const int MAGIC_SIZE = 6;
|
||||
const int SINGLE_KV_DATA_SIZE = 64;
|
||||
const int HEADER_SIZE = 4096;
|
||||
const int SUM_SIZE = 16;
|
||||
const int64_t MAGIC_SIZE = 6;
|
||||
const int64_t HEADER_SIZE = 4090;
|
||||
const int64_t SUM_SIZE = 16;
|
||||
|
||||
bool
|
||||
validate(std::string s) {
|
||||
std::regex test("[=;]+");
|
||||
return !std::regex_match(s.begin(), s.end(), test);
|
||||
}
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
@ -167,7 +172,11 @@ WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
|
|||
|
||||
std::string kv;
|
||||
for (auto& map : maps) {
|
||||
kv.append(map.first + "=" + map.second + ";");
|
||||
if (validate(map.first) && validate(map.second)) {
|
||||
kv.append(map.first + "=" + map.second + ";");
|
||||
} else {
|
||||
throw "Equal and semicolon are illegal character in header data";
|
||||
}
|
||||
}
|
||||
if (kv.size() > HEADER_SIZE) {
|
||||
throw "Exceeded the limit of header data size";
|
||||
|
|
|
@ -29,10 +29,9 @@
|
|||
#include "storage/FSHandler.h"
|
||||
|
||||
extern const char* MAGIC;
|
||||
extern const int MAGIC_SIZE;
|
||||
extern const int SINGLE_KV_DATA_SIZE;
|
||||
extern const int HEADER_SIZE;
|
||||
extern const int SUM_SIZE;
|
||||
extern const int64_t MAGIC_SIZE;
|
||||
extern const int64_t HEADER_SIZE;
|
||||
extern const int64_t SUM_SIZE;
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
|
|
@ -459,6 +459,72 @@ TEST_F(SnapshotTest, PartitionTest2) {
|
|||
ASSERT_FALSE(status.ok());
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, DropSegmentTest){
|
||||
LSN_TYPE lsn = 0;
|
||||
auto next_lsn = [&]() -> decltype(lsn) {
|
||||
return ++lsn;
|
||||
};
|
||||
auto collection_name = "test";
|
||||
ScopedSnapshotT ss;
|
||||
ss = CreateCollection(collection_name, ++lsn);
|
||||
|
||||
|
||||
milvus::Status status;
|
||||
|
||||
PartitionContext pp_ctx;
|
||||
std::stringstream p_name_stream;
|
||||
|
||||
auto num = RandomInt(3, 5);
|
||||
for (auto i = 0; i < num; ++i) {
|
||||
p_name_stream.str("");
|
||||
p_name_stream << "partition_" << i;
|
||||
pp_ctx.name = p_name_stream.str();
|
||||
ss = CreatePartition(ss->GetName(), pp_ctx, next_lsn());
|
||||
ASSERT_TRUE(ss);
|
||||
}
|
||||
|
||||
ASSERT_EQ(ss->NumberOfPartitions(), num + 1);
|
||||
|
||||
auto total_row_cnt = 0;
|
||||
auto partitions = ss->GetResources<Partition>();
|
||||
SegmentFileContext sf_context;
|
||||
SFContextBuilder(sf_context, ss);
|
||||
|
||||
for (auto& kv : partitions) {
|
||||
num = RandomInt(2, 5);
|
||||
auto row_cnt = 1024;
|
||||
for (auto i = 0; i < num; ++i) {
|
||||
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok());
|
||||
total_row_cnt += row_cnt;
|
||||
}
|
||||
}
|
||||
|
||||
status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(total_row_cnt, ss->GetCollectionCommit()->GetRowCount());
|
||||
|
||||
OperationContext drop_seg_context;
|
||||
auto segments = ss->GetResources<Segment>();
|
||||
auto prev_partitions = ss->GetResources<Partition>();
|
||||
ASSERT_TRUE(!prev_partitions.empty());
|
||||
ASSERT_TRUE(!segments.empty());
|
||||
for (auto kv:segments){
|
||||
milvus::engine::snapshot::ID_TYPE segment_id = kv.first;
|
||||
auto seg = ss->GetResource<milvus::engine::snapshot::Segment>(segment_id);
|
||||
drop_seg_context.prev_segment = seg;
|
||||
auto drop_op = std::make_shared<milvus::engine::snapshot::DropSegmentOperation>(drop_seg_context, ss);
|
||||
status = drop_op->Push();
|
||||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
|
||||
ASSERT_TRUE(status.ok());
|
||||
auto result_segments = ss->GetResources<Segment>();
|
||||
auto result_partitions = ss->GetResources<Partition>();
|
||||
ASSERT_TRUE(!result_partitions.empty());
|
||||
ASSERT_TRUE(result_segments.empty());
|
||||
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, IndexTest) {
|
||||
LSN_TYPE lsn = 0;
|
||||
auto next_lsn = [&]() -> decltype(lsn) {
|
||||
|
|
|
@ -827,9 +827,6 @@ class TestInsertInvalid(object):
|
|||
'''
|
||||
entity_id = get_entity_id
|
||||
ids = [entity_id for _ in range(nb)]
|
||||
# if isinstance(entity_id, int):
|
||||
# connect.insert(id_collection, entities, ids)
|
||||
# else:
|
||||
with pytest.raises(Exception):
|
||||
connect.insert(id_collection, entities, ids)
|
||||
|
||||
|
|
Loading…
Reference in New Issue