mirror of https://github.com/milvus-io/milvus.git
fix: fix memory leak when cancel segcore task (#29431)
#29430 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>pull/29437/head
parent
dd9c61831d
commit
1cbe3cd5fc
|
@ -207,6 +207,11 @@ class Driver : public std::enable_shared_from_this<Driver> {
|
||||||
Init(std::unique_ptr<DriverContext> driver_ctx,
|
Init(std::unique_ptr<DriverContext> driver_ctx,
|
||||||
std::vector<std::unique_ptr<Operator>> operators);
|
std::vector<std::unique_ptr<Operator>> operators);
|
||||||
|
|
||||||
|
void
|
||||||
|
CloseByTask() {
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Driver() = default;
|
Driver() = default;
|
||||||
|
|
||||||
|
|
|
@ -129,6 +129,13 @@ Task::CreateDriversLocked(std::shared_ptr<Task>& self,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
Task::Terminate(TaskState state) {
|
||||||
|
for (auto& driver : drivers_) {
|
||||||
|
driver->CloseByTask();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
RowVectorPtr
|
RowVectorPtr
|
||||||
Task::Next(ContinueFuture* future) {
|
Task::Next(ContinueFuture* future) {
|
||||||
// NOTE: Task::Next is single-threaded execution
|
// NOTE: Task::Next is single-threaded execution
|
||||||
|
|
|
@ -149,8 +149,7 @@ class Task : public std::enable_shared_from_this<Task> {
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
Terminate(TaskState state) {
|
Terminate(TaskState state);
|
||||||
}
|
|
||||||
|
|
||||||
std::exception_ptr
|
std::exception_ptr
|
||||||
error() const {
|
error() const {
|
||||||
|
@ -167,6 +166,11 @@ class Task : public std::enable_shared_from_this<Task> {
|
||||||
num_finished_drivers_++;
|
num_finished_drivers_++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
RequestCancel() {
|
||||||
|
Terminate(TaskState::kCanceled);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string uuid_;
|
std::string uuid_;
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,7 @@ ExecPlanNodeVisitor::ExecuteExprNodeInternal(
|
||||||
if (cache_offset_vec->size() == 0) {
|
if (cache_offset_vec->size() == 0) {
|
||||||
auto active_count = segment->get_active_count(timestamp_);
|
auto active_count = segment->get_active_count(timestamp_);
|
||||||
bitset_holder.resize(active_count);
|
bitset_holder.resize(active_count);
|
||||||
|
task->RequestCancel();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
auto cache_offset_vec_ptr =
|
auto cache_offset_vec_ptr =
|
||||||
|
|
Loading…
Reference in New Issue