mirror of https://github.com/milvus-io/milvus.git
enhance: Throw error instead of crash when index cannot be built (#31844)
issue: #27589 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/32013/head^2
parent
1cd15d9322
commit
1b767669a4
|
@ -59,8 +59,17 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
|
|||
local_chunk_manager->CreateDir(local_index_path_prefix);
|
||||
auto diskann_index_pack =
|
||||
knowhere::Pack(std::shared_ptr<knowhere::FileManager>(file_manager_));
|
||||
index_ = knowhere::IndexFactory::Instance().Create<T>(
|
||||
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
|
||||
GetIndexType(), version, diskann_index_pack);
|
||||
if (get_index_obj.has_value()) {
|
||||
index_ = get_index_obj.value();
|
||||
} else {
|
||||
auto err = get_index_obj.error();
|
||||
if (err == knowhere::Status::invalid_index_error) {
|
||||
throw SegcoreError(ErrorCode::Unsupported, get_index_obj.what());
|
||||
}
|
||||
throw SegcoreError(ErrorCode::KnowhereError, get_index_obj.what());
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -88,8 +97,17 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
|
|||
local_chunk_manager->CreateDir(local_index_path_prefix);
|
||||
auto diskann_index_pack =
|
||||
knowhere::Pack(std::shared_ptr<knowhere::FileManager>(file_manager_));
|
||||
index_ = knowhere::IndexFactory::Instance().Create<T>(
|
||||
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
|
||||
GetIndexType(), version, diskann_index_pack);
|
||||
if (get_index_obj.has_value()) {
|
||||
index_ = get_index_obj.value();
|
||||
} else {
|
||||
auto err = get_index_obj.error();
|
||||
if (err == knowhere::Status::invalid_index_error) {
|
||||
throw SegcoreError(ErrorCode::Unsupported, get_index_obj.what());
|
||||
}
|
||||
throw SegcoreError(ErrorCode::KnowhereError, get_index_obj.what());
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
|
|
@ -72,8 +72,17 @@ VectorMemIndex<T>::VectorMemIndex(
|
|||
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
|
||||
}
|
||||
CheckCompatible(version);
|
||||
index_ =
|
||||
auto get_index_obj =
|
||||
knowhere::IndexFactory::Instance().Create<T>(GetIndexType(), version);
|
||||
if (get_index_obj.has_value()) {
|
||||
index_ = get_index_obj.value();
|
||||
} else {
|
||||
auto err = get_index_obj.error();
|
||||
if (err == knowhere::Status::invalid_index_error) {
|
||||
throw SegcoreError(ErrorCode::Unsupported, get_index_obj.what());
|
||||
}
|
||||
throw SegcoreError(ErrorCode::KnowhereError, get_index_obj.what());
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -95,8 +104,17 @@ VectorMemIndex<T>::VectorMemIndex(
|
|||
}
|
||||
auto version = create_index_info.index_engine_version;
|
||||
CheckCompatible(version);
|
||||
index_ =
|
||||
auto get_index_obj =
|
||||
knowhere::IndexFactory::Instance().Create<T>(GetIndexType(), version);
|
||||
if (get_index_obj.has_value()) {
|
||||
index_ = get_index_obj.value();
|
||||
} else {
|
||||
auto err = get_index_obj.error();
|
||||
if (err == knowhere::Status::invalid_index_error) {
|
||||
throw SegcoreError(ErrorCode::Unsupported, get_index_obj.what());
|
||||
}
|
||||
throw SegcoreError(ErrorCode::KnowhereError, get_index_obj.what());
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
|
|
@ -72,6 +72,11 @@ CreateIndexV0(enum CDataType dtype,
|
|||
*res_index = index.release();
|
||||
status.error_code = Success;
|
||||
status.error_msg = "";
|
||||
} catch (SegcoreError& e) {
|
||||
auto status = CStatus();
|
||||
status.error_code = e.get_error_code();
|
||||
status.error_msg = strdup(e.what());
|
||||
return status;
|
||||
} catch (std::exception& e) {
|
||||
status.error_code = UnexpectedError;
|
||||
status.error_msg = strdup(e.what());
|
||||
|
@ -140,6 +145,11 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
|
|||
status.error_code = Success;
|
||||
status.error_msg = "";
|
||||
return status;
|
||||
} catch (SegcoreError& e) {
|
||||
auto status = CStatus();
|
||||
status.error_code = e.get_error_code();
|
||||
status.error_msg = strdup(e.what());
|
||||
return status;
|
||||
} catch (std::exception& e) {
|
||||
auto status = CStatus();
|
||||
status.error_code = UnexpectedError;
|
||||
|
@ -228,6 +238,11 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
|
|||
index->BuildV2();
|
||||
*res_index = index.release();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (SegcoreError& e) {
|
||||
auto status = CStatus();
|
||||
status.error_code = e.get_error_code();
|
||||
status.error_msg = strdup(e.what());
|
||||
return status;
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
#-------------------------------------------------------------------------------
|
||||
|
||||
# Update KNOWHERE_VERSION for the first occurrence
|
||||
set( KNOWHERE_VERSION e141e8b )
|
||||
set( KNOWHERE_VERSION 3c46f4c7 )
|
||||
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
|
||||
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
|
||||
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")
|
||||
|
|
|
@ -1985,9 +1985,10 @@ TEST(CApiTest, LoadIndexInfo) {
|
|||
|
||||
auto N = 1024 * 10;
|
||||
auto [raw_data, timestamps, uids] = generate_data(N);
|
||||
auto indexing = knowhere::IndexFactory::Instance().Create<float>(
|
||||
auto get_index_obj = knowhere::IndexFactory::Instance().Create<float>(
|
||||
knowhere::IndexEnum::INDEX_FAISS_IVFSQ8,
|
||||
knowhere::Version::GetCurrentVersion().VersionNumber());
|
||||
auto indexing = get_index_obj.value();
|
||||
auto conf =
|
||||
knowhere::Json{{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
|
||||
{knowhere::meta::DIM, DIM},
|
||||
|
@ -2035,9 +2036,10 @@ TEST(CApiTest, LoadIndexSearch) {
|
|||
auto N = 1024 * 10;
|
||||
auto num_query = 100;
|
||||
auto [raw_data, timestamps, uids] = generate_data(N);
|
||||
auto indexing = knowhere::IndexFactory::Instance().Create<float>(
|
||||
auto get_index_obj = knowhere::IndexFactory::Instance().Create<float>(
|
||||
knowhere::IndexEnum::INDEX_FAISS_IVFSQ8,
|
||||
knowhere::Version::GetCurrentVersion().VersionNumber());
|
||||
auto indexing = get_index_obj.value();
|
||||
auto conf =
|
||||
knowhere::Json{{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
|
||||
{knowhere::meta::DIM, DIM},
|
||||
|
|
|
@ -201,6 +201,16 @@ func (sched *TaskScheduler) scheduleIndexBuildTask() []task {
|
|||
return ret
|
||||
}
|
||||
|
||||
func getStateFromError(err error) commonpb.IndexState {
|
||||
if errors.Is(err, errCancel) {
|
||||
return commonpb.IndexState_Retry
|
||||
} else if errors.Is(err, merr.ErrIoKeyNotFound) || errors.Is(err, merr.ErrSegcoreUnsupported) {
|
||||
// NoSuchKey or unsupported error
|
||||
return commonpb.IndexState_Failed
|
||||
}
|
||||
return commonpb.IndexState_Retry
|
||||
}
|
||||
|
||||
func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
||||
wrap := func(fn func(ctx context.Context) error) error {
|
||||
select {
|
||||
|
@ -221,14 +231,8 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
pipelines := []func(context.Context) error{t.Prepare, t.BuildIndex, t.SaveIndexFiles}
|
||||
for _, fn := range pipelines {
|
||||
if err := wrap(fn); err != nil {
|
||||
if errors.Is(err, errCancel) {
|
||||
log.Ctx(t.Ctx()).Warn("index build task canceled, retry it", zap.String("task", t.Name()))
|
||||
t.SetState(commonpb.IndexState_Retry, err.Error())
|
||||
} else if errors.Is(err, merr.ErrIoKeyNotFound) {
|
||||
t.SetState(commonpb.IndexState_Failed, err.Error())
|
||||
} else {
|
||||
t.SetState(commonpb.IndexState_Retry, err.Error())
|
||||
}
|
||||
log.Ctx(t.Ctx()).Warn("process task failed", zap.Error(err))
|
||||
t.SetState(getStateFromError(err), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,9 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error {
|
|||
|
||||
logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, errorMsg)
|
||||
log.Warn(logMsg)
|
||||
if errorCode == 2003 {
|
||||
return merr.WrapErrSegcoreUnsupported(int32(errorCode), logMsg)
|
||||
}
|
||||
return merr.WrapErrSegcore(int32(errorCode), logMsg)
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,8 @@ var (
|
|||
ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false)
|
||||
|
||||
// Segcore related
|
||||
ErrSegcore = newMilvusError("segcore error", 2000, false)
|
||||
ErrSegcore = newMilvusError("segcore error", 2000, false)
|
||||
ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false)
|
||||
|
||||
// Do NOT export this,
|
||||
// never allow programmer using this, keep only for converting unknown error to milvusError
|
||||
|
|
|
@ -870,6 +870,14 @@ func WrapErrSegcore(code int32, msg ...string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func WrapErrSegcoreUnsupported(code int32, msg ...string) error {
|
||||
err := wrapFields(ErrSegcoreUnsupported, value("segcoreCode", code))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// field related
|
||||
func WrapErrFieldNotFound[T any](field T, msg ...string) error {
|
||||
err := wrapFields(ErrFieldNotFound, value("field", field))
|
||||
|
|
Loading…
Reference in New Issue