enhance: async search and retrieve in cgo (#34200)

issue: #33132
pr: #33133
other pr: #33228, #34084, #33946

- implement future-based cgo utility
- async search and retrieve in cgo
- modify gc configuration document

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/34475/head
chyezh 2024-07-04 13:02:09 +08:00 committed by GitHub
parent 3efb78e154
commit a1a0a56f86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 2222 additions and 281 deletions

View File

@ -13,6 +13,9 @@ run:
- ci
skip-files:
- partial_search_test.go
build-tags:
- dynamic
- test
linters:
disable-all: true

View File

@ -155,8 +155,9 @@ lint-fix: getdeps
#TODO: Check code specifications by golangci-lint
static-check: getdeps
@echo "Running $@ check"
@source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd client && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/client/.golangci.yml
verifiers: build-cpp getdeps cppcheck fmt static-check

View File

@ -489,11 +489,11 @@ dataCoord:
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
enableGarbageCollection: true
gc:
interval: 3600 # gc interval in seconds
missingTolerance: 86400 # file meta missing tolerance duration in seconds, default to 24hr(1d)
dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 3600
interval: 3600 # meta-based gc scanning interval in seconds
missingTolerance: 86400 # orphan file gc tolerance duration in seconds (orphan file which last modified time before the tolerance interval ago will be deleted)
dropTolerance: 10800 # meta-based gc tolerace duration in seconds (file which meta is marked as dropped before the tolerace interval ago will be deleted)
removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects
scanInterval: 168 # garbage collection scan residue interval in hours
scanInterval: 168 # orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours
enableActiveStandby: false
brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout
autoBalance: true # Enable auto balance

View File

@ -305,6 +305,11 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/
FILES_MATCHING PATTERN "*_c.h"
)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/futures/
DESTINATION include/futures
FILES_MATCHING PATTERN "*.h"
)
install(DIRECTORY ${CMAKE_BINARY_DIR}/lib/
DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR}
)

View File

@ -35,3 +35,4 @@ add_subdirectory( indexbuilder )
add_subdirectory( clustering )
add_subdirectory( exec )
add_subdirectory( bitset )
add_subdirectory( futures )

View File

@ -64,8 +64,10 @@ enum ErrorCode {
MemAllocateFailed = 2034,
MemAllocateSizeNotMatch = 2035,
MmapError = 2036,
// timeout or cancel related.
FollyOtherException = 2037,
FollyCancel = 2038,
KnowhereError = 2100,
};
namespace impl {
void
@ -90,7 +92,7 @@ class SegcoreError : public std::runtime_error {
}
ErrorCode
get_error_code() {
get_error_code() const {
return error_code_;
}
@ -114,9 +116,9 @@ FailureCStatus(int code, const std::string& msg) {
}
inline CStatus
FailureCStatus(std::exception* ex) {
if (dynamic_cast<SegcoreError*>(ex) != nullptr) {
auto segcore_error = dynamic_cast<SegcoreError*>(ex);
FailureCStatus(const std::exception* ex) {
if (dynamic_cast<const SegcoreError*>(ex) != nullptr) {
auto segcore_error = dynamic_cast<const SegcoreError*>(ex);
return CStatus{static_cast<int>(segcore_error->get_error_code()),
strdup(ex->what())};
}

View File

@ -0,0 +1,24 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License
milvus_add_pkg_config("milvus_futures")
set(FUTURES_SRC
Executor.cpp
future_c.cpp
future_test_case_c.cpp
)
add_library(milvus_futures SHARED ${FUTURES_SRC})
target_link_libraries(milvus_futures milvus_common)
install(TARGETS milvus_futures DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <chrono>
#include "Executor.h"
#include "common/Common.h"
namespace milvus::futures {
const int kNumPriority = 3;
folly::CPUThreadPoolExecutor*
getGlobalCPUExecutor() {
static folly::CPUThreadPoolExecutor executor(
std::thread::hardware_concurrency(),
folly::CPUThreadPoolExecutor::makeDefaultPriorityQueue(kNumPriority),
std::make_shared<folly::NamedThreadFactory>("MILVUS_FUTURE_CPU_"));
return &executor;
}
}; // namespace milvus::futures

View File

@ -0,0 +1,30 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <memory>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
#include <folly/system/HardwareConcurrency.h>
namespace milvus::futures {
namespace ExecutePriority {
const int LOW = 2;
const int NORMAL = 1;
const int HIGH = 0;
} // namespace ExecutePriority
folly::CPUThreadPoolExecutor*
getGlobalCPUExecutor();
}; // namespace milvus::futures

View File

@ -0,0 +1,228 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <stdlib.h>
#include <common/EasyAssert.h>
#include <folly/CancellationToken.h>
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
#include "future_c_types.h"
#include "LeakyResult.h"
#include "Ready.h"
namespace milvus::futures {
/// @brief a virtual class that represents a future can be polymorphic called by CGO code.
/// implemented by Future template.
class IFuture {
public:
/// @brief cancel the future with the given exception.
/// After cancelled is called, the underlying async function will receive cancellation.
/// It just a signal notification, the cancellation is handled by user-defined.
/// If the underlying async function ignore the cancellation signal, the Future is still blocked.
virtual void
cancel() = 0;
/// @brief check if the future is ready or canceled.
/// @return true if the future is ready or canceled, otherwise false.
virtual bool
isReady() = 0;
/// @brief register a callback that will be called when the future is ready or future has been ready.
virtual void
registerReadyCallback(CUnlockGoMutexFn unlockFn, CLockedGoMutex* mutex) = 0;
/// @brief get the result of the future. it must be called if future is ready.
/// the first element of the pair is the result,
/// the second element of the pair is the exception.
/// !!! It can only be called once,
/// and the result need to be manually released by caller after these call.
virtual std::pair<void*, CStatus>
leakyGet() = 0;
/// @brief leaked future object created by method `Future<R>::createLeakedFuture` can be droped by these method.
static void
releaseLeakedFuture(IFuture* future) {
delete future;
}
virtual ~IFuture() = default;
};
/// @brief a class that represents a cancellation token
class CancellationToken : public folly::CancellationToken {
public:
CancellationToken(folly::CancellationToken&& token) noexcept
: folly::CancellationToken(std::move(token)) {
}
/// @brief check if the token is cancelled, throw a FutureCancellation exception if it is.
void
throwIfCancelled() const {
if (isCancellationRequested()) {
throw folly::FutureCancellation();
}
}
};
/// @brief Future is a class that bound a future with a result for
/// using by cgo.
/// @tparam R is the return type of the producer function.
template <class R>
class Future : public IFuture {
public:
/// @brief do a async operation which will produce a result.
/// fn returns pointer to R (leaked, default memory allocator) if it is success, otherwise it will throw a exception.
/// returned result or exception will be handled by consumer side.
template <typename Fn,
typename = std::enable_if<
std::is_invocable_r_v<R*, Fn, CancellationToken>>>
static std::unique_ptr<Future<R>>
async(folly::Executor::KeepAlive<> executor,
int priority,
Fn&& fn) noexcept {
auto future = std::make_unique<Future<R>>();
// setup the interrupt handler for the promise.
future->setInterruptHandler();
// start async function.
future->asyncProduce(executor, priority, std::forward<Fn>(fn));
// register consume callback function.
future->registerConsumeCallback(executor, priority);
return future;
}
/// use `async`.
Future()
: ready_(std::make_shared<Ready<LeakyResult<R>>>()),
promise_(std::make_shared<folly::SharedPromise<R*>>()),
cancellation_source_() {
}
Future(const Future<R>&) = delete;
Future(Future<R>&&) noexcept = default;
Future&
operator=(const Future<R>&) = delete;
Future&
operator=(Future<R>&&) noexcept = default;
/// @brief see `IFuture::cancel`
void
cancel() noexcept override {
promise_->getSemiFuture().cancel();
}
/// @brief see `IFuture::registerReadyCallback`
void
registerReadyCallback(CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex) noexcept override {
ready_->callOrRegisterCallback(
[unlockFn = unlockFn, mutex = mutex]() { unlockFn(mutex); });
}
/// @brief see `IFuture::isReady`
bool
isReady() noexcept override {
return ready_->isReady();
}
/// @brief see `IFuture::leakyGet`
std::pair<void*, CStatus>
leakyGet() noexcept override {
auto result = std::move(*ready_).getValue();
return result.leakyGet();
}
private:
/// @brief set the interrupt handler for the promise used in async produce arm.
void
setInterruptHandler() {
promise_->setInterruptHandler([cancellation_source =
cancellation_source_,
ready = ready_](
const folly::exception_wrapper& ew) {
// 1. set the result to perform a fast fail.
// 2. set the cancellation to the source to notify cancellation to the consumers.
ew.handle(
[&](const folly::FutureCancellation& e) {
cancellation_source.requestCancellation();
},
[&](const folly::FutureTimeout& e) {
cancellation_source.requestCancellation();
});
});
}
/// @brief do the R produce operation in async way.
template <typename Fn,
typename... Args,
typename = std::enable_if<
std::is_invocable_r_v<R*, Fn, CancellationToken>>>
void
asyncProduce(folly::Executor::KeepAlive<> executor, int priority, Fn&& fn) {
// start produce process async.
auto cancellation_token =
CancellationToken(cancellation_source_.getToken());
auto runner = [fn = std::forward<Fn>(fn),
cancellation_token = std::move(cancellation_token)]() {
cancellation_token.throwIfCancelled();
return fn(cancellation_token);
};
// the runner is executed may be executed in different thread.
// so manage the promise with shared_ptr.
auto thenRunner = [promise = promise_, runner = std::move(runner)](
auto&&) { promise->setWith(std::move(runner)); };
folly::makeSemiFuture().via(executor, priority).then(thenRunner);
}
/// @brief async consume the result of the future.
void
registerConsumeCallback(folly::Executor::KeepAlive<> executor,
int priority) noexcept {
// set up the result consume arm and exception consume arm.
promise_->getSemiFuture()
.via(executor, priority)
.thenValue(
[ready = ready_](R* r) { ready->setValue(LeakyResult<R>(r)); })
.thenError(folly::tag_t<folly::FutureCancellation>{},
[ready = ready_](const folly::FutureCancellation& e) {
ready->setValue(
LeakyResult<R>(milvus::FollyCancel, e.what()));
})
.thenError(folly::tag_t<folly::FutureException>{},
[ready = ready_](const folly::FutureException& e) {
ready->setValue(LeakyResult<R>(
milvus::FollyOtherException, e.what()));
})
.thenError(folly::tag_t<milvus::SegcoreError>{},
[ready = ready_](const milvus::SegcoreError& e) {
ready->setValue(LeakyResult<R>(
static_cast<int>(e.get_error_code()), e.what()));
})
.thenError(folly::tag_t<std::exception>{},
[ready = ready_](const std::exception& e) {
ready->setValue(LeakyResult<R>(
milvus::UnexpectedError, e.what()));
});
}
private:
std::shared_ptr<Ready<LeakyResult<R>>> ready_;
std::shared_ptr<folly::SharedPromise<R*>> promise_;
folly::CancellationSource cancellation_source_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,112 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <optional>
#include <string>
#include <exception>
#include <memory>
#include <functional>
#include <common/type_c.h>
namespace milvus::futures {
/// @brief LeakyResult is a class that holds the result that can be leaked.
/// @tparam R is a type to real result that can be leak after get operation.
template <class R>
class LeakyResult {
public:
/// @brief default construct a empty Result, which is just used for easy contruction.
LeakyResult() {
}
/// @brief create a LeakyResult with error code and error message which means failure.
/// @param error_code see CStatus difinition.
/// @param error_msg see CStatus difinition.
LeakyResult(int error_code, const std::string& error_msg) {
auto msg = strdup(error_msg.c_str());
status_ = std::make_optional(CStatus{error_code, msg});
}
/// @brief create a LeakyResult with a result which means success.
/// @param r
LeakyResult(R* r) : result_(std::make_optional(r)) {
}
LeakyResult(const LeakyResult<R>&) = delete;
LeakyResult(LeakyResult<R>&& other) noexcept {
if (other.result_.has_value()) {
result_ = std::move(other.result_);
other.result_.reset();
}
if (other.status_.has_value()) {
status_ = std::move(other.status_);
other.status_.reset();
}
}
LeakyResult&
operator=(const LeakyResult<R>&) = delete;
LeakyResult&
operator=(LeakyResult<R>&& other) noexcept {
if (this != &other) {
if (other.result_.has_value()) {
result_ = std::move(other.result_);
other.result_.reset();
}
if (other.status_.has_value()) {
status_ = std::move(other.status_);
other.status_.reset();
}
}
return *this;
}
/// @brief get the Result or CStatus from LeakyResult, performed a manual memory management.
/// caller has responsibitiy to release if void* is not nullptr or cstatus is not nullptr.
/// @return a pair of void* and CStatus is returned, void* => R*.
/// condition (void* == nullptr and CStatus is failure) or (void* != nullptr and CStatus is success) is met.
/// release operation of CStatus see common/type_c.h.
std::pair<void*, CStatus>
leakyGet() {
if (result_.has_value()) {
R* result_ptr = result_.value();
result_.reset();
return std::make_pair<void*, CStatus>(result_ptr,
CStatus{0, nullptr});
}
if (status_.has_value()) {
CStatus status = status_.value();
status_.reset();
return std::make_pair<void*, CStatus>(
nullptr, CStatus{status.error_code, status.error_msg});
}
throw std::logic_error("get on a not ready LeakyResult");
}
~LeakyResult() {
if (result_.has_value()) {
delete result_.value();
}
if (status_.has_value()) {
free((char*)(status_.value().error_msg));
}
}
private:
std::optional<CStatus> status_;
std::optional<R*> result_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,97 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <mutex>
#include <optional>
#include <functional>
#include <vector>
namespace milvus::futures {
/// @brief Ready is a class that holds a value of type T.
/// value of Ready can be only set into ready by once,
/// and allows to register callbacks to be called when the value is ready.
template <class T>
class Ready {
public:
Ready() : is_ready_(false){};
Ready(const Ready<T>&) = delete;
Ready(Ready<T>&&) noexcept = default;
Ready&
operator=(const Ready<T>&) = delete;
Ready&
operator=(Ready<T>&&) noexcept = default;
/// @brief set the value into Ready.
void
setValue(T&& value) {
mutex_.lock();
value_ = std::move(value);
is_ready_ = true;
std::vector<std::function<void()>> callbacks(std::move(callbacks_));
mutex_.unlock();
// perform all callbacks which is registered before value is ready.
for (auto& callback : callbacks) {
callback();
}
}
/// @brief get the value from Ready.
/// @return ready value.
T
getValue() && {
std::lock_guard<std::mutex> lock(mutex_);
if (!is_ready_) {
throw std::runtime_error("Value is not ready");
}
auto v(std::move(value_.value()));
value_.reset();
return std::move(v);
}
/// @brief check if the value is ready.
bool
isReady() const {
const std::lock_guard<std::mutex> lock(mutex_);
return is_ready_;
}
/// @brief register a callback into Ready if value is not ready, otherwise call it directly.
template <typename Fn, typename = std::enable_if<std::is_invocable_v<Fn>>>
void
callOrRegisterCallback(Fn&& fn) {
mutex_.lock();
// call if value is ready,
// otherwise register as a callback to be called when value is ready.
if (is_ready_) {
mutex_.unlock();
fn();
return;
}
callbacks_.push_back(std::forward<Fn>(fn));
mutex_.unlock();
}
private:
std::optional<T> value_;
mutable std::mutex mutex_;
std::vector<std::function<void()>> callbacks_;
bool is_ready_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,60 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <memory>
#include "future_c.h"
#include "folly/init/Init.h"
#include "Future.h"
#include "Executor.h"
#include "log/Log.h"
extern "C" void
future_cancel(CFuture* future) {
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->cancel();
}
extern "C" bool
future_is_ready(CFuture* future) {
return static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->isReady();
}
extern "C" void
future_register_ready_callback(CFuture* future,
CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex) {
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->registerReadyCallback(unlockFn, mutex);
}
extern "C" CStatus
future_leak_and_get(CFuture* future, void** result) {
auto [r, s] =
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->leakyGet();
*result = r;
return s;
}
extern "C" void
future_destroy(CFuture* future) {
milvus::futures::IFuture::releaseLeakedFuture(
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future)));
}
extern "C" void
executor_set_thread_num(int thread_num) {
milvus::futures::getGlobalCPUExecutor()->setNumThreads(thread_num);
LOG_INFO("future executor setup cpu executor with thread num: {}",
thread_num);
}

View File

@ -0,0 +1,47 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "future_c_types.h"
#include "common/type_c.h"
#ifdef __cplusplus
extern "C" {
#endif
void
future_cancel(CFuture* future);
bool
future_is_ready(CFuture* future);
void
future_register_ready_callback(CFuture* future,
CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex);
CStatus
future_leak_and_get(CFuture* future, void** result);
// TODO: only for testing, add test macro for this function.
CFuture*
future_create_test_case(int interval, int loop_cnt, int caseNo);
void
future_destroy(CFuture* future);
void
executor_set_thread_num(int thread_num);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,26 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
typedef struct CFuture CFuture;
typedef struct CLockedGoMutex CLockedGoMutex;
typedef void (*CUnlockGoMutexFn)(CLockedGoMutex* mutex);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,42 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "Future.h"
#include "Executor.h"
extern "C" CFuture*
future_create_test_case(int interval, int loop_cnt, int case_no) {
auto future = milvus::futures::Future<int>::async(
milvus::futures::getGlobalCPUExecutor(),
milvus::futures::ExecutePriority::HIGH,
[interval = interval, loop_cnt = loop_cnt, case_no = case_no](
milvus::futures::CancellationToken token) {
for (int i = 0; i < loop_cnt; i++) {
if (case_no != 0) {
token.throwIfCancelled();
}
std::this_thread::sleep_for(
std::chrono::milliseconds(interval));
}
switch (case_no) {
case 1:
throw std::runtime_error("case 1");
case 2:
throw folly::FutureNoExecutor();
case 3:
throw milvus::SegcoreError(milvus::UnexpectedError,
"case 3");
}
return new int(case_no);
});
return static_cast<CFuture*>(static_cast<void*>(
static_cast<milvus::futures::IFuture*>(future.release())));
}

View File

@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus Futures
Description: Futures modules for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_futures
Cflags: -I${includedir}

View File

@ -42,6 +42,6 @@ set(SEGCORE_FILES
check_vec_index_c.cpp)
add_library(milvus_segcore SHARED ${SEGCORE_FILES})
target_link_libraries(milvus_segcore milvus_query milvus_bitset milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage)
target_link_libraries(milvus_segcore milvus_query milvus_bitset milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage milvus_futures)
install(TARGETS milvus_segcore DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -27,6 +27,8 @@
#include "segcore/SegmentSealedImpl.h"
#include "segcore/Utils.h"
#include "storage/Util.h"
#include "futures/Future.h"
#include "futures/Executor.h"
#include "storage/space.h"
////////////////////////////// common interfaces //////////////////////////////
@ -82,113 +84,129 @@ DeleteSearchResult(CSearchResult search_result) {
delete res;
}
CStatus
Search(CTraceContext c_trace,
CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
CSearchResult* result) {
try {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
auto plan = (milvus::query::Plan*)c_plan;
auto phg_ptr = reinterpret_cast<const milvus::query::PlaceholderGroup*>(
c_placeholder_group);
CFuture* // Future<milvus::SearchResult*>
AsyncSearch(CTraceContext c_trace,
CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp) {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
auto plan = (milvus::query::Plan*)c_plan;
auto phg_ptr = reinterpret_cast<const milvus::query::PlaceholderGroup*>(
c_placeholder_group);
// save trace context into search_info
auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_;
trace_ctx.traceID = c_trace.traceID;
trace_ctx.spanID = c_trace.spanID;
trace_ctx.traceFlags = c_trace.traceFlags;
auto future = milvus::futures::Future<milvus::SearchResult>::async(
milvus::futures::getGlobalCPUExecutor(),
milvus::futures::ExecutePriority::HIGH,
[c_trace, segment, plan, phg_ptr, timestamp](
milvus::futures::CancellationToken cancel_token) {
// save trace context into search_info
auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_;
trace_ctx.traceID = c_trace.traceID;
trace_ctx.spanID = c_trace.spanID;
trace_ctx.traceFlags = c_trace.traceFlags;
auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx);
milvus::tracer::SetRootSpan(span);
auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx);
milvus::tracer::SetRootSpan(span);
auto search_result = segment->Search(plan, phg_ptr, timestamp);
if (!milvus::PositivelyRelated(
plan->plan_node_->search_info_.metric_type_)) {
for (auto& dis : search_result->distances_) {
dis *= -1;
auto search_result = segment->Search(plan, phg_ptr, timestamp);
if (!milvus::PositivelyRelated(
plan->plan_node_->search_info_.metric_type_)) {
for (auto& dis : search_result->distances_) {
dis *= -1;
}
}
}
*result = search_result.release();
span->End();
milvus::tracer::CloseRootSpan();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
span->End();
milvus::tracer::CloseRootSpan();
return search_result.release();
});
return static_cast<CFuture*>(static_cast<void*>(
static_cast<milvus::futures::IFuture*>(future.release())));
}
void
DeleteRetrieveResult(CRetrieveResult* retrieve_result) {
std::free(const_cast<void*>(retrieve_result->proto_blob));
delete[] static_cast<uint8_t*>(
const_cast<void*>(retrieve_result->proto_blob));
delete retrieve_result;
}
CStatus
Retrieve(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
uint64_t timestamp,
CRetrieveResult* result,
int64_t limit_size,
bool ignore_non_pk) {
/// Create a leaked CRetrieveResult from a proto.
/// Should be released by DeleteRetrieveResult.
CRetrieveResult*
CreateLeakedCRetrieveResultFromProto(
std::unique_ptr<milvus::proto::segcore::RetrieveResults> retrieve_result) {
auto size = retrieve_result->ByteSizeLong();
auto buffer = new uint8_t[size];
try {
auto segment =
static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
auto retrieve_result = segment->Retrieve(
&trace_ctx, plan, timestamp, limit_size, ignore_non_pk);
auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);
retrieve_result->SerializePartialToArray(buffer.get(), size);
result->proto_blob = buffer.release();
result->proto_size = size;
return milvus::SuccessCStatus();
retrieve_result->SerializePartialToArray(buffer, size);
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
delete[] buffer;
throw;
}
auto result = new CRetrieveResult();
result->proto_blob = buffer;
result->proto_size = size;
return result;
}
CStatus
RetrieveByOffsets(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
CRetrieveResult* result,
int64_t* offsets,
int64_t len) {
try {
auto segment =
static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
CFuture* // Future<CRetrieveResult>
AsyncRetrieve(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
uint64_t timestamp,
int64_t limit_size,
bool ignore_non_pk) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span(
"SegCoreRetrieveByOffsets", &trace_ctx, true);
auto future = milvus::futures::Future<CRetrieveResult>::async(
milvus::futures::getGlobalCPUExecutor(),
milvus::futures::ExecutePriority::HIGH,
[c_trace, segment, plan, timestamp, limit_size, ignore_non_pk](
milvus::futures::CancellationToken cancel_token) {
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
auto retrieve_result =
segment->Retrieve(&trace_ctx, plan, offsets, len);
auto retrieve_result = segment->Retrieve(
&trace_ctx, plan, timestamp, limit_size, ignore_non_pk);
auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);
retrieve_result->SerializePartialToArray(buffer.get(), size);
return CreateLeakedCRetrieveResultFromProto(
std::move(retrieve_result));
});
return static_cast<CFuture*>(static_cast<void*>(
static_cast<milvus::futures::IFuture*>(future.release())));
}
result->proto_blob = buffer.release();
result->proto_size = size;
CFuture* // Future<CRetrieveResult>
AsyncRetrieveByOffsets(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
int64_t* offsets,
int64_t len) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
auto future = milvus::futures::Future<CRetrieveResult>::async(
milvus::futures::getGlobalCPUExecutor(),
milvus::futures::ExecutePriority::HIGH,
[c_trace, segment, plan, offsets, len](
milvus::futures::CancellationToken cancel_token) {
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span(
"SegCoreRetrieveByOffsets", &trace_ctx, true);
auto retrieve_result =
segment->Retrieve(&trace_ctx, plan, offsets, len);
return CreateLeakedCRetrieveResultFromProto(
std::move(retrieve_result));
});
return static_cast<CFuture*>(static_cast<void*>(
static_cast<milvus::futures::IFuture*>(future.release())));
}
int64_t

View File

@ -20,6 +20,7 @@ extern "C" {
#include <stdint.h>
#include "common/type_c.h"
#include "futures/future_c.h"
#include "segcore/plan_c.h"
#include "segcore/load_index_c.h"
#include "segcore/load_field_data_c.h"
@ -43,33 +44,30 @@ ClearSegmentData(CSegmentInterface c_segment);
void
DeleteSearchResult(CSearchResult search_result);
CStatus
Search(CTraceContext c_trace,
CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
CSearchResult* result);
CFuture* // Future<CSearchResultBody>
AsyncSearch(CTraceContext c_trace,
CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp);
void
DeleteRetrieveResult(CRetrieveResult* retrieve_result);
CStatus
Retrieve(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
uint64_t timestamp,
CRetrieveResult* result,
int64_t limit_size,
bool ignore_non_pk);
CFuture* // Future<CRetrieveResult>
AsyncRetrieve(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
uint64_t timestamp,
int64_t limit_size,
bool ignore_non_pk);
CStatus
RetrieveByOffsets(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
CRetrieveResult* result,
int64_t* offsets,
int64_t len);
CFuture* // Future<CRetrieveResult>
AsyncRetrieveByOffsets(CTraceContext c_trace,
CSegmentInterface c_segment,
CRetrievePlan c_plan,
int64_t* offsets,
int64_t len);
int64_t
GetMemoryUsageInBytes(CSegmentInterface c_segment);

View File

@ -69,6 +69,7 @@ set(MILVUS_TEST_FILES
test_array_inverted_index.cpp
test_chunk_vector.cpp
test_mmap_chunk_manager.cpp
test_futures.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -11,6 +11,7 @@
#include <gtest/gtest.h>
#include "folly/init/Init.h"
#include "test_utils/Constants.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/RemoteChunkManagerSingleton.h"
@ -19,6 +20,8 @@
int
main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
folly::Init follyInit(&argc, &argv, false);
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
TestLocalPath);
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(

View File

@ -34,6 +34,7 @@
#include "segcore/Reduce.h"
#include "segcore/reduce_c.h"
#include "segcore/segment_c.h"
#include "futures/Future.h"
#include "test_utils/DataGen.h"
#include "test_utils/PbHelper.h"
#include "test_utils/indexbuilder_test_utils.h"
@ -64,14 +65,50 @@ CStatus
CRetrieve(CSegmentInterface c_segment,
CRetrievePlan c_plan,
uint64_t timestamp,
CRetrieveResult* result) {
return Retrieve({},
c_segment,
c_plan,
timestamp,
result,
DEFAULT_MAX_OUTPUT_SIZE,
false);
CRetrieveResult** result) {
auto future = AsyncRetrieve(
{}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false);
auto futurePtr = static_cast<milvus::futures::IFuture*>(
static_cast<void*>(static_cast<CFuture*>(future)));
std::mutex mu;
mu.lock();
futurePtr->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
auto [retrieveResult, status] = futurePtr->leakyGet();
if (status.error_code != 0) {
return status;
}
*result = static_cast<CRetrieveResult*>(retrieveResult);
return status;
}
CStatus
CRetrieveByOffsets(CSegmentInterface c_segment,
CRetrievePlan c_plan,
int64_t* offsets,
int64_t len,
CRetrieveResult** result) {
auto future = AsyncRetrieveByOffsets({}, c_segment, c_plan, offsets, len);
auto futurePtr = static_cast<milvus::futures::IFuture*>(
static_cast<void*>(static_cast<CFuture*>(future)));
std::mutex mu;
mu.lock();
futurePtr->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
auto [retrieveResult, status] = futurePtr->leakyGet();
if (status.error_code != 0) {
return status;
}
*result = static_cast<CRetrieveResult*>(retrieveResult);
return status;
}
const char*
@ -609,15 +646,16 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
plan->field_ids_ = target_field_ids;
auto max_ts = dataset.timestamps_[N - 1] + 10;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// retrieve pks = {2}
{
@ -633,11 +671,12 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 1);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// delete pks = {2}
delete_pks = {2};
@ -658,13 +697,13 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
// retrieve pks in {2}
res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
@ -721,15 +760,16 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
plan->field_ids_ = target_field_ids;
auto max_ts = dataset.timestamps_[N - 1] + 10;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
auto res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// retrieve pks = {2}
{
@ -745,11 +785,12 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 1);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// delete pks = {2}
delete_pks = {2};
@ -770,13 +811,13 @@ TEST(CApiTest, MultiDeleteSealedSegment) {
// retrieve pks in {2}
res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
@ -839,16 +880,17 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
res = CRetrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 3);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// delete data pks = {1, 2, 3}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
@ -873,13 +915,14 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
DeleteCollection(collection);
DeleteSegment(segment);
@ -920,16 +963,17 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
auto res = CRetrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 6);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// delete data pks = {1, 2, 3}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
@ -955,13 +999,13 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
@ -1030,16 +1074,17 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
res = CRetrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
// second insert data
// insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
@ -1061,13 +1106,13 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 3);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
@ -1127,18 +1172,19 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
auto res = CRetrieve(
segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 4);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
retrieve_result = nullptr;
DeleteCollection(collection);
DeleteSegment(segment);
@ -1324,13 +1370,21 @@ TEST(CApiTest, RetrieveTestWithExpr) {
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
auto res = CRetrieve(
segment, plan.get(), dataset.timestamps_[0], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
// Test Retrieve by offsets.
int64_t offsets[] = {0, 1, 2};
CRetrieveResult* retrieve_by_offsets_result = nullptr;
res = CRetrieveByOffsets(
segment, plan.get(), offsets, 3, &retrieve_by_offsets_result);
ASSERT_EQ(res.error_code, Success);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteRetrieveResult(retrieve_by_offsets_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
@ -4324,13 +4378,13 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) {
i8_fid, i16_fid, i32_fid, i64_fid, float_fid, double_fid};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
CRetrieveResult* retrieve_result = nullptr;
res = CRetrieve(
segment, plan.get(), raw_data.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob,
retrieve_result.proto_size);
auto suc = query_result->ParseFromArray(retrieve_result->proto_blob,
retrieve_result->proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->fields_data().size(), 6);
auto fields_data = query_result->fields_data();
@ -4369,7 +4423,7 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) {
}
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteRetrieveResult(retrieve_result);
DeleteSegment(segment);
}

View File

@ -0,0 +1,211 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "futures/Future.h"
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <stdlib.h>
#include <mutex>
#include <exception>
using namespace milvus::futures;
TEST(Futures, LeakyResult) {
{
LeakyResult<int> leaky_result;
ASSERT_ANY_THROW(leaky_result.leakyGet());
}
{
auto leaky_result = LeakyResult<int>(1, "error");
auto [r, s] = leaky_result.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
{
auto leaky_result = LeakyResult<int>(new int(1));
auto [r, s] = leaky_result.leakyGet();
ASSERT_NE(r, nullptr);
ASSERT_EQ(*(int*)(r), 1);
ASSERT_EQ(s.error_code, 0);
ASSERT_EQ(s.error_msg, nullptr);
delete (int*)(r);
}
{
LeakyResult<int> leaky_result(1, "error");
LeakyResult<int> leaky_result_moved(std::move(leaky_result));
auto [r, s] = leaky_result_moved.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
{
LeakyResult<int> leaky_result(1, "error");
LeakyResult<int> leaky_result_moved;
leaky_result_moved = std::move(leaky_result);
auto [r, s] = leaky_result_moved.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
}
TEST(Futures, Ready) {
Ready<int> ready;
int a = 0;
ready.callOrRegisterCallback([&a]() { a++; });
ASSERT_EQ(a, 0);
ASSERT_FALSE(ready.isReady());
ready.setValue(1);
ASSERT_EQ(a, 1);
ASSERT_TRUE(ready.isReady());
ready.callOrRegisterCallback([&a]() { a++; });
ASSERT_EQ(a, 2);
ASSERT_EQ(std::move(ready).getValue(), 1);
}
TEST(Futures, Future) {
folly::CPUThreadPoolExecutor executor(2);
// success path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_NE(r, nullptr);
ASSERT_EQ(*(int*)(r), 1);
ASSERT_EQ(s.error_code, 0);
ASSERT_EQ(s.error_msg, nullptr);
delete (int*)(r);
}
// error path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw milvus::SegcoreError(milvus::NotImplemented,
"unimplemented");
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::NotImplemented);
ASSERT_STREQ(s.error_msg, "unimplemented");
free((char*)(s.error_msg));
}
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw std::runtime_error("unimplemented");
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::UnexpectedError);
free((char*)(s.error_msg));
}
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw folly::FutureNotReady();
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::FollyOtherException);
free((char*)(s.error_msg));
}
// cancellation path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
for (int i = 0; i < 10; i++) {
token.throwIfCancelled();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return new int(1);
});
ASSERT_FALSE(future->isReady());
future->cancel();
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::FollyCancel);
free((char*)(s.error_msg));
}
}

View File

@ -609,10 +609,10 @@ TEST(GroupBY, Reduce) {
CSearchResult c_search_res_1;
CSearchResult c_search_res_2;
auto status =
Search({}, c_segment_1, c_plan, c_ph_group, 1L << 63, &c_search_res_1);
CSearch(c_segment_1, c_plan, c_ph_group, 1L << 63, &c_search_res_1);
ASSERT_EQ(status.error_code, Success);
status =
Search({}, c_segment_2, c_plan, c_ph_group, 1L << 63, &c_search_res_2);
CSearch(c_segment_2, c_plan, c_ph_group, 1L << 63, &c_search_res_2);
ASSERT_EQ(status.error_code, Success);
std::vector<CSearchResult> results;
results.push_back(c_search_res_1);

View File

@ -28,6 +28,7 @@
#include "segcore/Reduce.h"
#include "segcore/reduce_c.h"
#include "segcore/segment_c.h"
#include "futures/Future.h"
#include "DataGen.h"
#include "PbHelper.h"
#include "c_api_test_utils.h"
@ -147,8 +148,24 @@ CSearch(CSegmentInterface c_segment,
CPlaceholderGroup c_placeholder_group,
uint64_t timestamp,
CSearchResult* result) {
return Search(
{}, c_segment, c_plan, c_placeholder_group, timestamp, result);
auto future =
AsyncSearch({}, c_segment, c_plan, c_placeholder_group, timestamp);
auto futurePtr = static_cast<milvus::futures::IFuture*>(
static_cast<void*>(static_cast<CFuture*>(future)));
std::mutex mu;
mu.lock();
futurePtr->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
auto [searchResult, status] = futurePtr->leakyGet();
if (status.error_code != 0) {
return status;
}
*result = static_cast<CSearchResult>(searchResult);
return status;
}
} // namespace

View File

@ -28,6 +28,7 @@ import "C"
import (
"context"
"math"
"unsafe"
"github.com/golang/protobuf/proto"
@ -55,14 +56,9 @@ func HandleCStatus(ctx context.Context, status *C.CStatus, extraInfo string, fie
return err
}
// HandleCProto deal with the result proto returned from CGO
func HandleCProto(cRes *C.CProto, msg proto.Message) error {
// Standalone CProto is protobuf created by C side,
// Passed from c side
// memory is managed manually
lease, blob := cgoconverter.UnsafeGoBytes(&cRes.proto_blob, int(cRes.proto_size))
defer cgoconverter.Release(lease)
// UnmarshalCProto unmarshal the proto from C memory
func UnmarshalCProto(cRes *C.CProto, msg proto.Message) error {
blob := (*(*[math.MaxInt32]byte)(cRes.proto_blob))[:int(cRes.proto_size):int(cRes.proto_size)]
return proto.Unmarshal(blob, msg)
}

View File

@ -163,6 +163,10 @@ func (suite *RetrieveSuite) TestRetrieveSealed() {
suite.NoError(err)
suite.Len(res[0].Result.Offset, 3)
suite.manager.Segment.Unpin(segments)
resultByOffsets, err := suite.sealed.RetrieveByOffsets(context.Background(), plan, []int64{0, 1})
suite.NoError(err)
suite.Len(resultByOffsets.Offset, 0)
}
func (suite *RetrieveSuite) TestRetrieveGrowing() {
@ -182,6 +186,10 @@ func (suite *RetrieveSuite) TestRetrieveGrowing() {
suite.NoError(err)
suite.Len(res[0].Result.Offset, 3)
suite.manager.Segment.Unpin(segments)
resultByOffsets, err := suite.growing.RetrieveByOffsets(context.Background(), plan, []int64{0, 1})
suite.NoError(err)
suite.Len(resultByOffsets.Offset, 0)
}
func (suite *RetrieveSuite) TestRetrieveStreamSealed() {

View File

@ -17,8 +17,9 @@
package segments
/*
#cgo pkg-config: milvus_segcore
#cgo pkg-config: milvus_segcore milvus_futures
#include "futures/future_c.h"
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
#include "segcore/reduce_c.h"
@ -53,6 +54,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments/state"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/cgo"
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -565,34 +567,39 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
defer s.ptrLock.RUnlock()
traceCtx := ParseCTraceContext(ctx)
defer runtime.KeepAlive(traceCtx)
defer runtime.KeepAlive(searchReq)
hasIndex := s.ExistIndex(searchReq.searchFieldID)
log = log.With(zap.Bool("withIndex", hasIndex))
log.Debug("search segment...")
var searchResult SearchResult
var status C.CStatus
GetSQPool().Submit(func() (any, error) {
tr := timerecord.NewTimeRecorder("cgoSearch")
status = C.Search(traceCtx.ctx,
s.ptr,
searchReq.plan.cSearchPlan,
searchReq.cPlaceholderGroup,
C.uint64_t(searchReq.mvccTimestamp),
&searchResult.cSearchResult,
)
runtime.KeepAlive(traceCtx)
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "Search failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String())); err != nil {
tr := timerecord.NewTimeRecorder("cgoSearch")
future := cgo.Async(
ctx,
func() cgo.CFuturePtr {
return (cgo.CFuturePtr)(C.AsyncSearch(
traceCtx.ctx,
s.ptr,
searchReq.plan.cSearchPlan,
searchReq.cPlaceholderGroup,
C.uint64_t(searchReq.mvccTimestamp),
))
},
cgo.WithName("search"),
)
defer future.Release()
result, err := future.BlockAndLeakyGet()
if err != nil {
log.Warn("Search failed")
return nil, err
}
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("search segment done")
return &searchResult, nil
return &SearchResult{
cSearchResult: (C.CSearchResult)(result),
}, nil
}
func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
@ -612,69 +619,65 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
log.Debug("begin to retrieve")
traceCtx := ParseCTraceContext(ctx)
defer runtime.KeepAlive(traceCtx)
defer runtime.KeepAlive(plan)
maxLimitSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
var retrieveResult RetrieveResult
var status C.CStatus
GetSQPool().Submit(func() (any, error) {
ts := C.uint64_t(plan.Timestamp)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
status = C.Retrieve(traceCtx.ctx,
s.ptr,
plan.cRetrievePlan,
ts,
&retrieveResult.cRetrieveResult,
C.int64_t(maxLimitSize),
C.bool(plan.ignoreNonPk))
runtime.KeepAlive(traceCtx)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("cgo retrieve done", zap.Duration("timeTaken", tr.ElapseSpan()))
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "Retrieve failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("msgID", plan.msgID),
zap.String("segmentType", s.segmentType.String())); err != nil {
future := cgo.Async(
ctx,
func() cgo.CFuturePtr {
return (cgo.CFuturePtr)(C.AsyncRetrieve(
traceCtx.ctx,
s.ptr,
plan.cRetrievePlan,
C.uint64_t(plan.Timestamp),
C.int64_t(maxLimitSize),
C.bool(plan.ignoreNonPk),
))
},
cgo.WithName("retrieve"),
)
defer future.Release()
result, err := future.BlockAndLeakyGet()
if err != nil {
log.Warn("Retrieve failed")
return nil, err
}
defer C.DeleteRetrieveResult((*C.CRetrieveResult)(result))
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
_, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "partial-segcore-results-deserialization")
defer span.End()
result := new(segcorepb.RetrieveResults)
if err := HandleCProto(&retrieveResult.cRetrieveResult, result); err != nil {
retrieveResult := new(segcorepb.RetrieveResults)
if err := UnmarshalCProto((*C.CRetrieveResult)(result), retrieveResult); err != nil {
log.Warn("unmarshal retrieve result failed", zap.Error(err))
return nil, err
}
log.Debug("retrieve segment done",
zap.Int("resultNum", len(result.Offset)),
zap.Int("resultNum", len(retrieveResult.Offset)),
)
// Sort was done by the segcore.
// sort.Sort(&byPK{result})
return result, nil
return retrieveResult, nil
}
func (s *LocalSegment) RetrieveByOffsets(ctx context.Context, plan *RetrievePlan, offsets []int64) (*segcorepb.RetrieveResults, error) {
if len(offsets) == 0 {
return nil, merr.WrapErrParameterInvalid("segment offsets", "empty offsets")
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
if len(offsets) == 0 {
return nil, merr.WrapErrParameterInvalid("segment offsets", "empty offsets")
}
fields := []zap.Field{
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
@ -686,40 +689,49 @@ func (s *LocalSegment) RetrieveByOffsets(ctx context.Context, plan *RetrievePlan
log := log.Ctx(ctx).With(fields...)
log.Debug("begin to retrieve by offsets")
traceCtx := ParseCTraceContext(ctx)
var retrieveResult RetrieveResult
var status C.CStatus
tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets")
status = C.RetrieveByOffsets(traceCtx.ctx,
s.ptr,
plan.cRetrievePlan,
&retrieveResult.cRetrieveResult,
(*C.int64_t)(unsafe.Pointer(&offsets[0])),
C.int64_t(len(offsets)))
runtime.KeepAlive(traceCtx)
traceCtx := ParseCTraceContext(ctx)
defer runtime.KeepAlive(traceCtx)
defer runtime.KeepAlive(plan)
defer runtime.KeepAlive(offsets)
future := cgo.Async(
ctx,
func() cgo.CFuturePtr {
return (cgo.CFuturePtr)(C.AsyncRetrieveByOffsets(
traceCtx.ctx,
s.ptr,
plan.cRetrievePlan,
(*C.int64_t)(unsafe.Pointer(&offsets[0])),
C.int64_t(len(offsets)),
))
},
cgo.WithName("retrieve-by-offsets"),
)
defer future.Release()
result, err := future.BlockAndLeakyGet()
if err != nil {
log.Warn("RetrieveByOffsets failed")
return nil, err
}
defer C.DeleteRetrieveResult((*C.CRetrieveResult)(result))
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("cgo retrieve by offsets done", zap.Duration("timeTaken", tr.ElapseSpan()))
if err := HandleCStatus(ctx, &status, "RetrieveByOffsets failed", fields...); err != nil {
return nil, err
}
_, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "reduced-segcore-results-deserialization")
defer span.End()
result := new(segcorepb.RetrieveResults)
if err := HandleCProto(&retrieveResult.cRetrieveResult, result); err != nil {
retrieveResult := new(segcorepb.RetrieveResults)
if err := UnmarshalCProto((*C.CRetrieveResult)(result), retrieveResult); err != nil {
log.Warn("unmarshal retrieve by offsets result failed", zap.Error(err))
return nil, err
}
log.Debug("retrieve by segment offsets done")
return result, nil
log.Debug("retrieve by segment offsets done",
zap.Int("resultNum", len(retrieveResult.Offset)),
)
return retrieveResult, nil
}
func (s *LocalSegment) GetFieldDataPath(index *IndexedFieldInfo, offset int64) (dataPath string, offsetInBinlog int64) {

View File

@ -0,0 +1,27 @@
package cgo
/*
#cgo pkg-config: milvus_common
#include "common/type_c.h"
#include <stdlib.h>
*/
import "C"
import (
"unsafe"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func ConsumeCStatusIntoError(status *C.CStatus) error {
if status.error_code == 0 {
return nil
}
errorCode := status.error_code
errorMsg := C.GoString(status.error_msg)
getCGOCaller().call("free", func() {
C.free(unsafe.Pointer(status.error_msg))
})
return merr.SegcoreError(int32(errorCode), errorMsg)
}

View File

@ -0,0 +1,36 @@
package cgo
/*
#cgo pkg-config: milvus_futures
#include "futures/future_c.h"
*/
import "C"
import (
"math"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// initExecutor initialize underlying cgo thread pool.
func initExecutor() {
pt := paramtable.Get()
initPoolSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
C.executor_set_thread_num(C.int(initPoolSize))
resetThreadNum := func(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
log.Info("reset cgo thread num", zap.Int("thread_num", newSize))
C.executor_set_thread_num(C.int(newSize))
}
}
pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("cgo."+pt.QueryNodeCfg.MaxReadConcurrency.Key, resetThreadNum))
pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("cgo."+pt.QueryNodeCfg.CGOPoolSizeRatio.Key, resetThreadNum))
}

View File

@ -0,0 +1,192 @@
package cgo
/*
#cgo pkg-config: milvus_futures
#include "futures/future_c.h"
#include <stdlib.h>
extern void unlockMutex(void*);
static inline void unlockMutexOnC(CLockedGoMutex* m) {
unlockMutex((void*)(m));
}
static inline void future_go_register_ready_callback(CFuture* f, CLockedGoMutex* m) {
future_register_ready_callback(f, unlockMutexOnC, m);
}
*/
import "C"
import (
"context"
"sync"
"unsafe"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/merr"
)
var ErrConsumed = errors.New("future is already consumed")
// Would put this in futures.go but for the documented issue with
// exports and functions in preamble
// (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions)
//
//export unlockMutex
func unlockMutex(p unsafe.Pointer) {
m := (*sync.Mutex)(p)
m.Unlock()
}
type basicFuture interface {
// Context return the context of the future.
Context() context.Context
// BlockUntilReady block until the future is ready or canceled.
// caller can call this method multiple times in different concurrent unit.
BlockUntilReady()
// cancel the future with error.
cancel(error)
}
type Future interface {
basicFuture
// BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result.
// Caller should only call once for BlockAndLeakyGet, otherwise the ErrConsumed will returned.
// Caller will get the merr.ErrSegcoreCancel or merr.ErrSegcoreTimeout respectively if the future is canceled or timeout.
// Caller will get other error if the underlying cgo function throws, otherwise caller will get result.
// Caller should free the result after used (defined by caller), otherwise the memory of result is leaked.
BlockAndLeakyGet() (unsafe.Pointer, error)
// Release the resource of the future.
// !!! Release is not concurrent safe with other methods.
// It should be called only once after all method of future is returned.
Release()
}
type (
CFuturePtr unsafe.Pointer
CGOAsyncFunction = func() CFuturePtr
)
// Async is a helper function to call a C async function that returns a future.
func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {
initCGO()
options := getDefaultOpt()
// apply options.
for _, opt := range opts {
opt(options)
}
// create a future for caller to use.
var cFuturePtr *C.CFuture
getCGOCaller().call(options.name, func() {
cFuturePtr = (*C.CFuture)(f())
})
ctx, cancel := context.WithCancel(ctx)
future := &futureImpl{
closure: f,
ctx: ctx,
ctxCancel: cancel,
releaserOnce: sync.Once{},
future: cFuturePtr,
opts: options,
state: newFutureState(),
}
// register the future to do timeout notification.
futureManager.Register(future)
return future
}
type futureImpl struct {
ctx context.Context
ctxCancel context.CancelFunc
future *C.CFuture
closure CGOAsyncFunction
opts *options
state futureState
releaserOnce sync.Once
}
func (f *futureImpl) Context() context.Context {
return f.ctx
}
func (f *futureImpl) BlockUntilReady() {
f.blockUntilReady()
}
func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
f.blockUntilReady()
if !f.state.intoConsumed() {
return nil, ErrConsumed
}
var ptr unsafe.Pointer
var status C.CStatus
getCGOCaller().call("future_leak_and_get", func() {
status = C.future_leak_and_get(f.future, &ptr)
})
err := ConsumeCStatusIntoError(&status)
if errors.Is(err, merr.ErrSegcoreFollyCancel) {
// mark the error with context error.
return nil, errors.Mark(err, f.ctx.Err())
}
return ptr, err
}
func (f *futureImpl) Release() {
// block until ready to release the future.
f.blockUntilReady()
// release the future.
getCGOCaller().call("future_destroy", func() {
C.future_destroy(f.future)
})
}
func (f *futureImpl) cancel(err error) {
if !f.state.checkUnready() {
// only unready future can be canceled.
// a ready future' cancel make no sense.
return
}
if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) {
getCGOCaller().call("future_cancel", func() {
C.future_cancel(f.future)
})
return
}
panic("unreachable: invalid cancel error type")
}
func (f *futureImpl) blockUntilReady() {
if !f.state.checkUnready() {
// only unready future should be block until ready.
return
}
mu := &sync.Mutex{}
mu.Lock()
getCGOCaller().call("future_go_register_ready_callback", func() {
C.future_go_register_ready_callback(f.future, (*C.CLockedGoMutex)(unsafe.Pointer(mu)))
})
mu.Lock()
// mark the future as ready at go side to avoid more cgo calls.
f.state.intoReady()
// notify the future manager that the future is ready.
f.ctxCancel()
if f.opts.releaser != nil {
f.releaserOnce.Do(f.opts.releaser)
}
}

View File

@ -0,0 +1,272 @@
package cgo
import (
"context"
"fmt"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestMain(m *testing.M) {
paramtable.Init()
initCGO()
exitCode := m.Run()
if exitCode > 0 {
os.Exit(exitCode)
}
}
func TestFutureWithSuccessCase(t *testing.T) {
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: 100,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 100, getCInt(result))
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
freeCInt(result)
runtime.GC()
_, err = future.BlockAndLeakyGet()
assert.ErrorIs(t, err, ErrConsumed)
}
func TestFutureWithCaseNoInterrupt(t *testing.T) {
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 0, getCInt(result))
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
freeCInt(result)
// Test cancellation on no interrupt handling case.
start = time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
result, err = future.BlockAndLeakyGet()
// the future is timeout by the context after 200ms, but the underlying task doesn't handle the cancel, the future will return after 2s.
assert.Greater(t, time.Since(start).Seconds(), 2.0)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 0, getCInt(result))
freeCInt(result)
}
// TestFutures test the future implementation.
func TestFutures(t *testing.T) {
// Test failed case, throw folly exception.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowStdException,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// Test failed case, throw std exception.
future = createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowFollyException,
})
defer future.Release()
start = time.Now()
future.BlockUntilReady() // test block until ready too.
result, err = future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyOtherException)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
// Test failed case, throw std exception.
future = createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowSegcoreException,
})
defer future.Release()
start = time.Now()
future.BlockUntilReady() // test block until ready too.
result, err = future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
// Test cancellation.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
// canceled before the future(2s) is ready.
go func() {
time.Sleep(200 * time.Millisecond)
cancel()
}()
start = time.Now()
result, err = future.BlockAndLeakyGet()
// the future is canceled by the context after 200ms, so the future should be done in 1s but not 2s.
assert.Less(t, time.Since(start).Seconds(), 1.0)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.Canceled))
assert.Nil(t, result)
// Test cancellation.
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
start = time.Now()
result, err = future.BlockAndLeakyGet()
// the future is timeout by the context after 200ms, so the future should be done in 1s but not 2s.
assert.Less(t, time.Since(start).Seconds(), 1.0)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Nil(t, result)
runtime.GC()
}
func TestConcurrent(t *testing.T) {
// Test is compatible with old implementation of fast fail future.
// So it's complicated and not easy to understand.
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(4)
// success case
go func() {
defer wg.Done()
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: 100,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 100, getCInt(result))
freeCInt(result)
}()
// fail case
go func() {
defer wg.Done()
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowStdException,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.Nil(t, result)
}()
// timeout case
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future := createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Nil(t, result)
}()
// no interrupt with timeout case
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future := createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
if err == nil {
assert.Equal(t, 0, getCInt(result))
} else {
// the future may be queued and not started,
// so the underlying task may be throw a cancel exception if it's not started.
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
}
freeCInt(result)
}()
}
wg.Wait()
assert.Eventually(t, func() bool {
stat := futureManager.Stat()
fmt.Printf("active count: %d\n", stat.ActiveCount)
return stat.ActiveCount == 0
}, 5*time.Second, 100*time.Millisecond)
runtime.GC()
}

View File

@ -0,0 +1,48 @@
//go:build test
// +build test
package cgo
/*
#cgo pkg-config: milvus_futures
#include "futures/future_c.h"
#include <stdlib.h>
*/
import "C"
import (
"context"
"time"
"unsafe"
)
const (
caseNoNoInterrupt int = 0
caseNoThrowStdException int = 1
caseNoThrowFollyException int = 2
caseNoThrowSegcoreException int = 3
)
type testCase struct {
interval time.Duration
loopCnt int
caseNo int
}
func createFutureWithTestCase(ctx context.Context, testCase testCase) Future {
f := func() CFuturePtr {
return (CFuturePtr)(C.future_create_test_case(C.int(testCase.interval.Milliseconds()), C.int(testCase.loopCnt), C.int(testCase.caseNo)))
}
future := Async(ctx, f, WithName("createFutureWithTestCase"))
return future
}
func getCInt(p unsafe.Pointer) int {
return int(*(*C.int)(p))
}
func freeCInt(p unsafe.Pointer) {
C.free(p)
}

View File

@ -0,0 +1,114 @@
package cgo
import (
"reflect"
"sync"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
const (
registerIndex = 0
maxSelectCase = 65535
defaultRegisterBuf = 1
)
var (
futureManager *activeFutureManager
initOnce sync.Once
)
// initCGO initializes the cgo caller and future manager.
func initCGO() {
initOnce.Do(func() {
nodeID := paramtable.GetStringNodeID()
initCaller(nodeID)
initExecutor()
futureManager = newActiveFutureManager(nodeID)
futureManager.Run()
})
}
type futureManagerStat struct {
ActiveCount int64
}
func newActiveFutureManager(nodeID string) *activeFutureManager {
manager := &activeFutureManager{
activeCount: atomic.NewInt64(0),
activeFutures: make([]basicFuture, 0),
cases: make([]reflect.SelectCase, 1),
register: make(chan basicFuture, defaultRegisterBuf),
nodeID: nodeID,
}
manager.cases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(manager.register),
}
return manager
}
// activeFutureManager manages the active futures.
// it will transfer the cancel signal into cgo.
type activeFutureManager struct {
activeCount *atomic.Int64
activeFutures []basicFuture
cases []reflect.SelectCase
register chan basicFuture
nodeID string
}
// Run starts the active future manager.
func (m *activeFutureManager) Run() {
go func() {
for {
m.doSelect()
}
}()
}
// Register registers a future when it's created into the manager.
func (m *activeFutureManager) Register(c basicFuture) {
m.register <- c
}
// Stat returns the stat of the manager, only for testing now.
func (m *activeFutureManager) Stat() futureManagerStat {
return futureManagerStat{
ActiveCount: m.activeCount.Load(),
}
}
// doSelect selects the active futures and cancel the finished ones.
func (m *activeFutureManager) doSelect() {
index, newCancelableObject, _ := reflect.Select(m.getSelectableCases())
if index == registerIndex {
newCancelable := newCancelableObject.Interface().(basicFuture)
m.cases = append(m.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(newCancelable.Context().Done()),
})
m.activeFutures = append(m.activeFutures, newCancelable)
} else {
m.cases = append(m.cases[:index], m.cases[index+1:]...)
offset := index - 1
// cancel the future and move it into gc manager.
m.activeFutures[offset].cancel(m.activeFutures[offset].Context().Err())
m.activeFutures = append(m.activeFutures[:offset], m.activeFutures[offset+1:]...)
}
activeTotal := len(m.activeFutures)
m.activeCount.Store(int64(activeTotal))
metrics.ActiveFutureTotal.WithLabelValues(
m.nodeID,
).Set(float64(activeTotal))
}
func (m *activeFutureManager) getSelectableCases() []reflect.SelectCase {
if len(m.cases) <= maxSelectCase {
return m.cases
}
return m.cases[0:maxSelectCase]
}

View File

@ -0,0 +1,32 @@
package cgo
func getDefaultOpt() *options {
return &options{
name: "unknown",
releaser: nil,
}
}
type options struct {
name string
releaser func()
}
// Opt is the option type for future.
type Opt func(*options)
// WithReleaser sets the releaser function.
// When a future is ready, the releaser function will be called once.
func WithReleaser(releaser func()) Opt {
return func(o *options) {
o.releaser = releaser
}
}
// WithName sets the name of the future.
// Only used for metrics.
func WithName(name string) Opt {
return func(o *options) {
o.name = name
}
}

56
internal/util/cgo/pool.go Normal file
View File

@ -0,0 +1,56 @@
package cgo
import (
"math"
"runtime"
"time"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var caller *cgoCaller
func initCaller(nodeID string) {
chSize := int64(math.Ceil(float64(hardware.GetCPUNum()) * paramtable.Get().QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
if chSize <= 0 {
chSize = 1
}
caller = &cgoCaller{
ch: make(chan struct{}, chSize),
nodeID: nodeID,
}
}
// getCGOCaller returns the cgoCaller instance.
func getCGOCaller() *cgoCaller {
return caller
}
// cgoCaller is a limiter to restrict the number of concurrent cgo calls.
type cgoCaller struct {
ch chan struct{}
nodeID string
}
// call calls the work function with a lock to restrict the number of concurrent cgo calls.
// it collect some metrics too.
func (c *cgoCaller) call(name string, work func()) {
start := time.Now()
c.ch <- struct{}{}
queueTime := time.Since(start)
metrics.CGOQueueDuration.WithLabelValues(c.nodeID).Observe(queueTime.Seconds())
runtime.LockOSThread()
defer func() {
runtime.UnlockOSThread()
<-c.ch
metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Dec()
total := time.Since(start) - queueTime
metrics.CGODuration.WithLabelValues(c.nodeID, name).Observe(total.Seconds())
}()
metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Inc()
work()
}

View File

@ -0,0 +1,38 @@
package cgo
import "go.uber.org/atomic"
const (
stateUnready int32 = iota
stateReady
stateConsumed
)
// newFutureState creates a new futureState.
func newFutureState() futureState {
return futureState{
inner: atomic.NewInt32(stateUnready),
}
}
// futureState is a state machine for future.
// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed
type futureState struct {
inner *atomic.Int32
}
// intoReady sets the state to ready.
func (s *futureState) intoReady() {
s.inner.CompareAndSwap(stateUnready, stateReady)
}
// intoConsumed sets the state to consumed.
// if the state is not ready, it does nothing and returns false.
func (s *futureState) intoConsumed() bool {
return s.inner.CompareAndSwap(stateReady, stateConsumed)
}
// checkUnready checks if the state is unready.
func (s *futureState) checkUnready() bool {
return s.inner.Load() == stateUnready
}

View File

@ -0,0 +1,86 @@
package metrics
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
subsystemCGO = "cgo"
cgoLabelName = "name"
once sync.Once
bucketsForCGOCall = []float64{
10 * time.Nanosecond.Seconds(),
100 * time.Nanosecond.Seconds(),
250 * time.Nanosecond.Seconds(),
500 * time.Nanosecond.Seconds(),
time.Microsecond.Seconds(),
10 * time.Microsecond.Seconds(),
20 * time.Microsecond.Seconds(),
50 * time.Microsecond.Seconds(),
100 * time.Microsecond.Seconds(),
250 * time.Microsecond.Seconds(),
500 * time.Microsecond.Seconds(),
time.Millisecond.Seconds(),
2 * time.Millisecond.Seconds(),
10 * time.Millisecond.Seconds(),
}
ActiveFutureTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "active_future_total",
Help: "Total number of active futures.",
}, []string{
nodeIDLabelName,
},
)
RunningCgoCallTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "running_cgo_call_total",
Help: "Total number of running cgo calls.",
}, []string{
nodeIDLabelName,
})
CGODuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "cgo_duration_seconds",
Help: "Histogram of cgo call duration in seconds.",
Buckets: bucketsForCGOCall,
}, []string{
nodeIDLabelName,
cgoLabelName,
},
)
CGOQueueDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "cgo_queue_duration_seconds",
Help: "Duration of cgo call in queue.",
Buckets: bucketsForCGOCall,
}, []string{
nodeIDLabelName,
},
)
)
// RegisterCGOMetrics registers the cgo metrics.
func RegisterCGOMetrics(registry *prometheus.Registry) {
once.Do(func() {
registry.MustRegister(ActiveFutureTotal)
registry.MustRegister(RunningCgoCallTotal)
registry.MustRegister(CGODuration)
registry.MustRegister(CGOQueueDuration)
})
}

View File

@ -38,6 +38,7 @@ func TestRegisterMetrics(t *testing.T) {
RegisterMetaMetrics(r)
RegisterStorageMetrics(r)
RegisterMsgStreamMetrics(r)
RegisterCGOMetrics(r)
})
}

View File

@ -805,6 +805,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeApplyBFCost)
registry.MustRegister(QueryNodeForwardDeleteCost)
registry.MustRegister(QueryNodeSegmentPruneRatio)
// Add cgo metrics
RegisterCGOMetrics(registry)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -175,9 +175,11 @@ var (
ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false)
// Segcore related
ErrSegcore = newMilvusError("segcore error", 2000, false)
ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false)
ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false)
ErrSegcore = newMilvusError("segcore error", 2000, false)
ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false)
ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false)
ErrSegcoreFollyOtherException = newMilvusError("segcore folly other exception", 2037, false) // throw from segcore.
ErrSegcoreFollyCancel = newMilvusError("segcore Future was canceled", 2038, false) // throw from segcore.
// Do NOT export this,
// never allow programmer using this, keep only for converting unknown error to milvusError

View File

@ -3442,7 +3442,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.gc.interval",
Version: "2.0.0",
DefaultValue: "3600",
Doc: "gc interval in seconds",
Doc: "meta-based gc scanning interval in seconds",
Export: true,
}
p.GCInterval.Init(base.mgr)
@ -3451,7 +3451,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.gc.scanInterval",
Version: "2.4.0",
DefaultValue: "168", // hours, default 7 * 24 hours
Doc: "garbage collection scan residue interval in hours",
Doc: "orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours",
Export: true,
}
p.GCScanIntervalInHour.Init(base.mgr)
@ -3461,7 +3461,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.gc.missingTolerance",
Version: "2.0.0",
DefaultValue: "86400",
Doc: "file meta missing tolerance duration in seconds, default to 24hr(1d)",
Doc: "orphan file gc tolerance duration in seconds (orphan file which last modified time before the tolerance interval ago will be deleted)",
Export: true,
}
p.GCMissingTolerance.Init(base.mgr)
@ -3470,7 +3470,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.gc.dropTolerance",
Version: "2.0.0",
DefaultValue: "10800",
Doc: "file belongs to dropped entity tolerance duration in seconds. 3600",
Doc: "meta-based gc tolerace duration in seconds (file which meta is marked as dropped before the tolerace interval ago will be deleted)",
Export: true,
}
p.GCDropTolerance.Init(base.mgr)

View File

@ -107,6 +107,7 @@ go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failf
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/cgo/..." -failfast -count=1 -ldflags="-r ${RPATH}"
}
function test_pkg()