enhance: async cgo utility (#33133)

issue: #30926, #33132

- implement future-based cgo utility.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/33736/head
chyezh 2024-06-09 22:55:53 +08:00 committed by GitHub
parent 80a2cd192a
commit f53ab54c5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1802 additions and 9 deletions

View File

@ -159,8 +159,8 @@ lint-fix: getdeps
#TODO: Check code specifications by golangci-lint
static-check: getdeps
@echo "Running $@ check"
@source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml
@source $(PWD)/scripts/setenv.sh && cd client && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/client/.golangci.yml
verifiers: build-cpp getdeps cppcheck fmt static-check

View File

@ -305,6 +305,11 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/
FILES_MATCHING PATTERN "*_c.h"
)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/futures/
DESTINATION include/futures
FILES_MATCHING PATTERN "*.h"
)
install(DIRECTORY ${CMAKE_BINARY_DIR}/lib/
DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR}
)

View File

@ -35,3 +35,4 @@ add_subdirectory( indexbuilder )
add_subdirectory( clustering )
add_subdirectory( exec )
add_subdirectory( bitset )
add_subdirectory( futures )

View File

@ -63,6 +63,9 @@ enum ErrorCode {
ClusterSkip = 2033,
KnowhereError = 2100,
// timeout or cancel related.
FollyOtherException = 2200,
FollyCancel = 2201
};
namespace impl {
void
@ -87,7 +90,7 @@ class SegcoreError : public std::runtime_error {
}
ErrorCode
get_error_code() {
get_error_code() const {
return error_code_;
}
@ -111,9 +114,9 @@ FailureCStatus(int code, const std::string& msg) {
}
inline CStatus
FailureCStatus(std::exception* ex) {
if (dynamic_cast<SegcoreError*>(ex) != nullptr) {
auto segcore_error = dynamic_cast<SegcoreError*>(ex);
FailureCStatus(const std::exception* ex) {
if (dynamic_cast<const SegcoreError*>(ex) != nullptr) {
auto segcore_error = dynamic_cast<const SegcoreError*>(ex);
return CStatus{static_cast<int>(segcore_error->get_error_code()),
strdup(ex->what())};
}

View File

@ -0,0 +1,24 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License
milvus_add_pkg_config("milvus_futures")
set(FUTURES_SRC
Executor.cpp
future_c.cpp
future_test_case_c.cpp
)
add_library(milvus_futures SHARED ${FUTURES_SRC})
target_link_libraries(milvus_futures milvus_common)
install(TARGETS milvus_futures DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,45 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <chrono>
#include "Executor.h"
#include "common/Common.h"
namespace milvus::futures {
const int kNumPriority = 3;
const int kMaxQueueSizeFactor = 16;
folly::Executor::KeepAlive<>
getGlobalCPUExecutor() {
static ExecutorSingleton singleton;
return singleton.GetCPUExecutor();
}
folly::Executor::KeepAlive<>
ExecutorSingleton::GetCPUExecutor() {
// TODO: fix the executor with a non-block way.
std::call_once(cpu_executor_once_, [this]() {
int num_threads = milvus::CPU_NUM;
auto num_priority = kNumPriority;
auto max_queue_size = num_threads * kMaxQueueSizeFactor;
cpu_executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
num_threads,
std::make_unique<folly::PriorityLifoSemMPMCQueue<
folly::CPUThreadPoolExecutor::CPUTask,
folly::QueueBehaviorIfFull::BLOCK>>(num_priority,
max_queue_size),
std::make_shared<folly::NamedThreadFactory>("MILVUS_CPU_"));
});
return folly::getKeepAliveToken(cpu_executor_.get());
}
}; // namespace milvus::futures

View File

@ -0,0 +1,40 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <memory>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
#include <folly/system/HardwareConcurrency.h>
namespace milvus::futures {
folly::Executor::KeepAlive<>
getGlobalCPUExecutor();
class ExecutorSingleton {
public:
ExecutorSingleton() = default;
ExecutorSingleton(const ExecutorSingleton&) = delete;
ExecutorSingleton(ExecutorSingleton&&) noexcept = delete;
folly::Executor::KeepAlive<>
GetCPUExecutor();
private:
std::unique_ptr<folly::Executor> cpu_executor_;
std::once_flag cpu_executor_once_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,226 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <stdlib.h>
#include <common/EasyAssert.h>
#include <folly/CancellationToken.h>
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
#include "future_c_types.h"
#include "LeakyResult.h"
#include "Ready.h"
namespace milvus::futures {
/// @brief a virtual class that represents a future can be polymorphic called by CGO code.
/// implemented by Future template.
class IFuture {
public:
/// @brief cancel the future with the given exception.
/// After cancelled is called, the underlying async function will receive cancellation.
/// It just a signal notification, the cancellation is handled by user-defined.
/// If the underlying async function ignore the cancellation signal, the Future is still blocked.
virtual void
cancel() = 0;
/// @brief check if the future is ready or canceled.
/// @return true if the future is ready or canceled, otherwise false.
virtual bool
isReady() = 0;
/// @brief register a callback that will be called when the future is ready or future has been ready.
virtual void
registerReadyCallback(CUnlockGoMutexFn unlockFn, CLockedGoMutex* mutex) = 0;
/// @brief get the result of the future. it must be called if future is ready.
/// the first element of the pair is the result,
/// the second element of the pair is the exception.
/// !!! It can only be called once,
/// and the result need to be manually released by caller after these call.
virtual std::pair<void*, CStatus>
leakyGet() = 0;
/// @brief leaked future object created by method `Future<R>::createLeakedFuture` can be droped by these method.
static void
releaseLeakedFuture(IFuture* future) {
delete future;
}
};
/// @brief a class that represents a cancellation token
class CancellationToken : public folly::CancellationToken {
public:
CancellationToken(folly::CancellationToken&& token) noexcept
: folly::CancellationToken(std::move(token)) {
}
/// @brief check if the token is cancelled, throw a FutureCancellation exception if it is.
void
throwIfCancelled() const {
if (isCancellationRequested()) {
throw folly::FutureCancellation();
}
}
};
/// @brief Future is a class that bound a future with a result for
/// using by cgo.
/// @tparam R is the return type of the producer function.
template <class R>
class Future : public IFuture {
public:
/// @brief do a async operation which will produce a result.
/// fn returns pointer to R (leaked, default memory allocator) if it is success, otherwise it will throw a exception.
/// returned result or exception will be handled by consumer side.
template <typename Fn,
typename = std::enable_if<
std::is_invocable_r_v<R*, Fn, CancellationToken>>>
static std::unique_ptr<Future<R>>
async(folly::Executor::KeepAlive<> executor,
int priority,
Fn&& fn) noexcept {
auto future = std::make_unique<Future<R>>();
// setup the interrupt handler for the promise.
future->setInterruptHandler();
// start async function.
future->asyncProduce(executor, priority, std::forward<Fn>(fn));
// register consume callback function.
future->registerConsumeCallback(executor, priority);
return future;
}
/// use `async`.
Future()
: ready_(std::make_shared<Ready<LeakyResult<R>>>()),
promise_(std::make_shared<folly::SharedPromise<R*>>()),
cancellation_source_() {
}
Future(const Future<R>&) = delete;
Future(Future<R>&&) noexcept = default;
Future&
operator=(const Future<R>&) = delete;
Future&
operator=(Future<R>&&) noexcept = default;
/// @brief see `IFuture::cancel`
void
cancel() noexcept override {
promise_->getSemiFuture().cancel();
}
/// @brief see `IFuture::registerReadyCallback`
void
registerReadyCallback(CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex) noexcept override {
ready_->callOrRegisterCallback(
[unlockFn = unlockFn, mutex = mutex]() { unlockFn(mutex); });
}
/// @brief see `IFuture::isReady`
bool
isReady() noexcept override {
return ready_->isReady();
}
/// @brief see `IFuture::leakyGet`
std::pair<void*, CStatus>
leakyGet() noexcept override {
auto result = std::move(*ready_).getValue();
return result.leakyGet();
}
private:
/// @brief set the interrupt handler for the promise used in async produce arm.
void
setInterruptHandler() {
promise_->setInterruptHandler([cancellation_source =
cancellation_source_,
ready = ready_](
const folly::exception_wrapper& ew) {
// 1. set the result to perform a fast fail.
// 2. set the cancellation to the source to notify cancellation to the consumers.
ew.handle(
[&](const folly::FutureCancellation& e) {
cancellation_source.requestCancellation();
},
[&](const folly::FutureTimeout& e) {
cancellation_source.requestCancellation();
});
});
}
/// @brief do the R produce operation in async way.
template <typename Fn,
typename... Args,
typename = std::enable_if<
std::is_invocable_r_v<R*, Fn, CancellationToken>>>
void
asyncProduce(folly::Executor::KeepAlive<> executor, int priority, Fn&& fn) {
// start produce process async.
auto cancellation_token =
CancellationToken(cancellation_source_.getToken());
auto runner = [fn = std::forward<Fn>(fn),
cancellation_token = std::move(cancellation_token)]() {
return fn(cancellation_token);
};
// the runner is executed may be executed in different thread.
// so manage the promise with shared_ptr.
auto thenRunner = [promise = promise_, runner = std::move(runner)](
auto&&) { promise->setWith(std::move(runner)); };
folly::makeSemiFuture().via(executor, priority).then(thenRunner);
}
/// @brief async consume the result of the future.
void
registerConsumeCallback(folly::Executor::KeepAlive<> executor,
int priority) noexcept {
// set up the result consume arm and exception consume arm.
promise_->getSemiFuture()
.via(executor, priority)
.thenValue(
[ready = ready_](R* r) { ready->setValue(LeakyResult<R>(r)); })
.thenError(folly::tag_t<folly::FutureCancellation>{},
[ready = ready_](const folly::FutureCancellation& e) {
ready->setValue(
LeakyResult<R>(milvus::FollyCancel, e.what()));
})
.thenError(folly::tag_t<folly::FutureException>{},
[ready = ready_](const folly::FutureException& e) {
ready->setValue(LeakyResult<R>(
milvus::FollyOtherException, e.what()));
})
.thenError(folly::tag_t<milvus::SegcoreError>{},
[ready = ready_](const milvus::SegcoreError& e) {
ready->setValue(LeakyResult<R>(
static_cast<int>(e.get_error_code()), e.what()));
})
.thenError(folly::tag_t<std::exception>{},
[ready = ready_](const std::exception& e) {
ready->setValue(LeakyResult<R>(
milvus::UnexpectedError, e.what()));
});
}
private:
std::shared_ptr<Ready<LeakyResult<R>>> ready_;
std::shared_ptr<folly::SharedPromise<R*>> promise_;
folly::CancellationSource cancellation_source_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,112 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <optional>
#include <string>
#include <exception>
#include <memory>
#include <functional>
#include <common/type_c.h>
namespace milvus::futures {
/// @brief LeakyResult is a class that holds the result that can be leaked.
/// @tparam R is a type to real result that can be leak after get operation.
template <class R>
class LeakyResult {
public:
/// @brief default construct a empty Result, which is just used for easy contruction.
LeakyResult() {
}
/// @brief create a LeakyResult with error code and error message which means failure.
/// @param error_code see CStatus difinition.
/// @param error_msg see CStatus difinition.
LeakyResult(int error_code, const std::string& error_msg) {
auto msg = strdup(error_msg.c_str());
status_ = std::make_optional(CStatus{error_code, msg});
}
/// @brief create a LeakyResult with a result which means success.
/// @param r
LeakyResult(R* r) : result_(std::make_optional(r)) {
}
LeakyResult(const LeakyResult<R>&) = delete;
LeakyResult(LeakyResult<R>&& other) noexcept {
if (other.result_.has_value()) {
result_ = std::move(other.result_);
other.result_.reset();
}
if (other.status_.has_value()) {
status_ = std::move(other.status_);
other.status_.reset();
}
}
LeakyResult&
operator=(const LeakyResult<R>&) = delete;
LeakyResult&
operator=(LeakyResult<R>&& other) noexcept {
if (this != &other) {
if (other.result_.has_value()) {
result_ = std::move(other.result_);
other.result_.reset();
}
if (other.status_.has_value()) {
status_ = std::move(other.status_);
other.status_.reset();
}
}
return *this;
}
/// @brief get the Result or CStatus from LeakyResult, performed a manual memory management.
/// caller has responsibitiy to release if void* is not nullptr or cstatus is not nullptr.
/// @return a pair of void* and CStatus is returned, void* => R*.
/// condition (void* == nullptr and CStatus is failure) or (void* != nullptr and CStatus is success) is met.
/// release operation of CStatus see common/type_c.h.
std::pair<void*, CStatus>
leakyGet() {
if (result_.has_value()) {
R* result_ptr = result_.value();
result_.reset();
return std::make_pair<void*, CStatus>(result_ptr,
CStatus{0, nullptr});
}
if (status_.has_value()) {
CStatus status = status_.value();
status_.reset();
return std::make_pair<void*, CStatus>(
nullptr, CStatus{status.error_code, status.error_msg});
}
throw std::logic_error("get on a not ready LeakyResult");
}
~LeakyResult() {
if (result_.has_value()) {
delete result_.value();
}
if (status_.has_value()) {
free((char*)(status_.value().error_msg));
}
}
private:
std::optional<CStatus> status_;
std::optional<R*> result_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,97 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <mutex>
#include <optional>
#include <functional>
#include <vector>
namespace milvus::futures {
/// @brief Ready is a class that holds a value of type T.
/// value of Ready can be only set into ready by once,
/// and allows to register callbacks to be called when the value is ready.
template <class T>
class Ready {
public:
Ready() : is_ready_(false){};
Ready(const Ready<T>&) = delete;
Ready(Ready<T>&&) noexcept = default;
Ready&
operator=(const Ready<T>&) = delete;
Ready&
operator=(Ready<T>&&) noexcept = default;
/// @brief set the value into Ready.
void
setValue(T&& value) {
mutex_.lock();
value_ = std::move(value);
is_ready_ = true;
std::vector<std::function<void()>> callbacks(std::move(callbacks_));
mutex_.unlock();
// perform all callbacks which is registered before value is ready.
for (auto& callback : callbacks) {
callback();
}
}
/// @brief get the value from Ready.
/// @return ready value.
T
getValue() && {
std::lock_guard<std::mutex> lock(mutex_);
if (!is_ready_) {
throw std::runtime_error("Value is not ready");
}
auto v(std::move(value_.value()));
value_.reset();
return std::move(v);
}
/// @brief check if the value is ready.
bool
isReady() const {
const std::lock_guard<std::mutex> lock(mutex_);
return is_ready_;
}
/// @brief register a callback into Ready if value is not ready, otherwise call it directly.
template <typename Fn, typename = std::enable_if<std::is_invocable_v<Fn>>>
void
callOrRegisterCallback(Fn&& fn) {
mutex_.lock();
// call if value is ready,
// otherwise register as a callback to be called when value is ready.
if (is_ready_) {
mutex_.unlock();
fn();
return;
}
callbacks_.push_back(std::forward<Fn>(fn));
mutex_.unlock();
}
private:
std::optional<T> value_;
mutable std::mutex mutex_;
std::vector<std::function<void()>> callbacks_;
bool is_ready_;
};
}; // namespace milvus::futures

View File

@ -0,0 +1,51 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <memory>
#include "future_c.h"
#include "folly/init/Init.h"
#include "Future.h"
extern "C" void
future_cancel(CFuture* future) {
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->cancel();
}
extern "C" bool
future_is_ready(CFuture* future) {
return static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->isReady();
}
extern "C" void
future_register_ready_callback(CFuture* future,
CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex) {
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->registerReadyCallback(unlockFn, mutex);
}
extern "C" CStatus
future_leak_and_get(CFuture* future, void** result) {
auto [r, s] =
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future))
->leakyGet();
*result = r;
return s;
}
extern "C" void
future_destroy(CFuture* future) {
milvus::futures::IFuture::releaseLeakedFuture(
static_cast<milvus::futures::IFuture*>(static_cast<void*>(future)));
}

View File

@ -0,0 +1,44 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "future_c_types.h"
#include "common/type_c.h"
#ifdef __cplusplus
extern "C" {
#endif
void
future_cancel(CFuture* future);
bool
future_is_ready(CFuture* future);
void
future_register_ready_callback(CFuture* future,
CUnlockGoMutexFn unlockFn,
CLockedGoMutex* mutex);
CStatus
future_leak_and_get(CFuture* future, void** result);
// TODO: only for testing, add test macro for this function.
CFuture*
future_create_test_case(int interval, int loop_cnt, int caseNo);
void
future_destroy(CFuture* future);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,26 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
typedef struct CFuture CFuture;
typedef struct CLockedGoMutex CLockedGoMutex;
typedef void (*CUnlockGoMutexFn)(CLockedGoMutex* mutex);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,42 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "Future.h"
#include "Executor.h"
extern "C" CFuture*
future_create_test_case(int interval, int loop_cnt, int case_no) {
auto future = milvus::futures::Future<int>::async(
milvus::futures::getGlobalCPUExecutor(),
0,
[interval = interval, loop_cnt = loop_cnt, case_no = case_no](
milvus::futures::CancellationToken token) {
for (int i = 0; i < loop_cnt; i++) {
if (case_no != 0) {
token.throwIfCancelled();
}
std::this_thread::sleep_for(
std::chrono::milliseconds(interval));
}
switch (case_no) {
case 1:
throw std::runtime_error("case 1");
case 2:
throw folly::FutureNoExecutor();
case 3:
throw milvus::SegcoreError(milvus::NotImplemented,
"case 3");
}
return new int(case_no);
});
return static_cast<CFuture*>(static_cast<void*>(
static_cast<milvus::futures::IFuture*>(future.release())));
}

View File

@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus Futures
Description: Futures modules for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_futures
Cflags: -I${includedir}

View File

@ -67,6 +67,7 @@ set(MILVUS_TEST_FILES
test_group_by.cpp
test_regex_query_util.cpp
test_regex_query.cpp
test_futures.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -0,0 +1,210 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "futures/Future.h"
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <stdlib.h>
#include <mutex>
#include <exception>
using namespace milvus::futures;
TEST(Futures, LeakyResult) {
{
LeakyResult<int> leaky_result;
ASSERT_ANY_THROW(leaky_result.leakyGet());
}
{
auto leaky_result = LeakyResult<int>(1, "error");
auto [r, s] = leaky_result.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
{
auto leaky_result = LeakyResult<int>(new int(1));
auto [r, s] = leaky_result.leakyGet();
ASSERT_NE(r, nullptr);
ASSERT_EQ(*(int*)(r), 1);
ASSERT_EQ(s.error_code, 0);
ASSERT_EQ(s.error_msg, nullptr);
delete (int*)(r);
}
{
LeakyResult<int> leaky_result(1, "error");
LeakyResult<int> leaky_result_moved(std::move(leaky_result));
auto [r, s] = leaky_result_moved.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
{
LeakyResult<int> leaky_result(1, "error");
LeakyResult<int> leaky_result_moved;
leaky_result_moved = std::move(leaky_result);
auto [r, s] = leaky_result_moved.leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, 1);
ASSERT_STREQ(s.error_msg, "error");
free((char*)(s.error_msg));
}
}
TEST(Futures, Ready) {
Ready<int> ready;
int a = 0;
ready.callOrRegisterCallback([&a]() { a++; });
ASSERT_EQ(a, 0);
ASSERT_FALSE(ready.isReady());
ready.setValue(1);
ASSERT_EQ(a, 1);
ASSERT_TRUE(ready.isReady());
ready.callOrRegisterCallback([&a]() { a++; });
ASSERT_EQ(a, 2);
ASSERT_EQ(std::move(ready).getValue(), 1);
}
TEST(Futures, Future) {
folly::CPUThreadPoolExecutor executor(2);
// success path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_NE(r, nullptr);
ASSERT_EQ(*(int*)(r), 1);
ASSERT_EQ(s.error_code, 0);
ASSERT_EQ(s.error_msg, nullptr);
delete (int*)(r);
}
// error path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw milvus::SegcoreError(milvus::NotImplemented,
"unimplemented");
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::NotImplemented);
ASSERT_STREQ(s.error_msg, "unimplemented");
free((char*)(s.error_msg));
}
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw std::runtime_error("unimplemented");
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::UnexpectedError);
free((char*)(s.error_msg));
}
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
throw folly::FutureNotReady();
return new int(1);
});
ASSERT_FALSE(future->isReady());
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::FollyOtherException);
free((char*)(s.error_msg));
}
// cancellation path.
{
// try a async function
auto future = milvus::futures::Future<int>::async(
&executor, 0, [](milvus::futures::CancellationToken token) {
for (int i = 0; i < 10; i++) {
token.throwIfCancelled();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return new int(1);
});
ASSERT_FALSE(future->isReady());
future->cancel();
std::mutex mu;
mu.lock();
future->registerReadyCallback(
[](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); },
(CLockedGoMutex*)(&mu));
mu.lock();
ASSERT_TRUE(future->isReady());
auto [r, s] = future->leakyGet();
ASSERT_EQ(r, nullptr);
ASSERT_EQ(s.error_code, milvus::FollyCancel);
}
}

View File

@ -0,0 +1,27 @@
package cgo
/*
#cgo pkg-config: milvus_common
#include "common/type_c.h"
#include <stdlib.h>
*/
import "C"
import (
"unsafe"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func ConsumeCStatusIntoError(status *C.CStatus) error {
if status.error_code == 0 {
return nil
}
errorCode := status.error_code
errorMsg := C.GoString(status.error_msg)
getCGOCaller().call("free", func() {
C.free(unsafe.Pointer(status.error_msg))
})
return merr.SegcoreError(int32(errorCode), errorMsg)
}

View File

@ -0,0 +1,190 @@
package cgo
/*
#cgo pkg-config: milvus_futures
#include "futures/future_c.h"
#include <stdlib.h>
extern void unlockMutex(void*);
static inline void unlockMutexOnC(CLockedGoMutex* m) {
unlockMutex((void*)(m));
}
static inline void future_go_register_ready_callback(CFuture* f, CLockedGoMutex* m) {
future_register_ready_callback(f, unlockMutexOnC, m);
}
*/
import "C"
import (
"context"
"sync"
"unsafe"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/merr"
)
var ErrConsumed = errors.New("future is already consumed")
// Would put this in futures.go but for the documented issue with
// exports and functions in preamble
// (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions)
//
//export unlockMutex
func unlockMutex(p unsafe.Pointer) {
m := (*sync.Mutex)(p)
m.Unlock()
}
type basicFuture interface {
// Context return the context of the future.
Context() context.Context
// BlockUntilReady block until the future is ready or canceled.
// caller can call this method multiple times in different concurrent unit.
BlockUntilReady()
// cancel the future with error.
cancel(error)
}
type Future interface {
basicFuture
// BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result.
// Caller should only call once for BlockAndLeakyGet, otherwise the ErrConsumed will returned.
// Caller will get the merr.ErrSegcoreCancel or merr.ErrSegcoreTimeout respectively if the future is canceled or timeout.
// Caller will get other error if the underlying cgo function throws, otherwise caller will get result.
// Caller should free the result after used (defined by caller), otherwise the memory of result is leaked.
BlockAndLeakyGet() (unsafe.Pointer, error)
// Release the resource of the future.
// !!! Release is not concurrent safe with other methods.
// It should be called only once after all method of future is returned.
Release()
}
type (
CFuturePtr unsafe.Pointer
CGOAsyncFunction = func() CFuturePtr
)
// Async is a helper function to call a C async function that returns a future.
func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {
options := getDefaultOpt()
// apply options.
for _, opt := range opts {
opt(options)
}
// create a future for caller to use.
var cFuturePtr *C.CFuture
getCGOCaller().call(options.name, func() {
cFuturePtr = (*C.CFuture)(f())
})
ctx, cancel := context.WithCancel(ctx)
future := &futureImpl{
closure: f,
ctx: ctx,
ctxCancel: cancel,
releaserOnce: sync.Once{},
future: cFuturePtr,
opts: options,
state: newFutureState(),
}
// register the future to do timeout notification.
futureManager.Register(future)
return future
}
type futureImpl struct {
ctx context.Context
ctxCancel context.CancelFunc
future *C.CFuture
closure CGOAsyncFunction
opts *options
state futureState
releaserOnce sync.Once
}
func (f *futureImpl) Context() context.Context {
return f.ctx
}
func (f *futureImpl) BlockUntilReady() {
f.blockUntilReady()
}
func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
f.blockUntilReady()
if !f.state.intoConsumed() {
return nil, ErrConsumed
}
var ptr unsafe.Pointer
var status C.CStatus
getCGOCaller().call("future_leak_and_get", func() {
status = C.future_leak_and_get(f.future, &ptr)
})
err := ConsumeCStatusIntoError(&status)
if errors.Is(err, merr.ErrSegcoreFollyCancel) {
// mark the error with context error.
return nil, errors.Mark(err, f.ctx.Err())
}
return ptr, err
}
func (f *futureImpl) Release() {
// block until ready to release the future.
f.blockUntilReady()
// release the future.
getCGOCaller().call("future_destroy", func() {
C.future_destroy(f.future)
})
}
func (f *futureImpl) cancel(err error) {
if !f.state.checkUnready() {
// only unready future can be canceled.
// a ready future' cancel make no sense.
return
}
if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) {
getCGOCaller().call("future_cancel", func() {
C.future_cancel(f.future)
})
return
}
panic("unreachable: invalid cancel error type")
}
func (f *futureImpl) blockUntilReady() {
if !f.state.checkUnready() {
// only unready future should be block until ready.
return
}
mu := &sync.Mutex{}
mu.Lock()
getCGOCaller().call("future_go_register_ready_callback", func() {
C.future_go_register_ready_callback(f.future, (*C.CLockedGoMutex)(unsafe.Pointer(mu)))
})
mu.Lock()
// mark the future as ready at go side to avoid more cgo calls.
f.state.intoReady()
// notify the future manager that the future is ready.
f.ctxCancel()
if f.opts.releaser != nil {
f.releaserOnce.Do(f.opts.releaser)
}
}

View File

@ -0,0 +1,278 @@
package cgo
import (
"context"
"fmt"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestMain(m *testing.M) {
paramtable.Init()
InitCGO()
exitCode := m.Run()
if exitCode > 0 {
os.Exit(exitCode)
}
}
func TestFutureWithSuccessCase(t *testing.T) {
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: 100,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 100, getCInt(result))
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
freeCInt(result)
runtime.GC()
_, err = future.BlockAndLeakyGet()
assert.ErrorIs(t, err, ErrConsumed)
assert.Eventually(t, func() bool {
return unreleasedCnt.Load() == 0
}, time.Second, time.Millisecond*100)
}
func TestFutureWithCaseNoInterrupt(t *testing.T) {
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 0, getCInt(result))
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
freeCInt(result)
// Test cancellation on no interrupt handling case.
start = time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
result, err = future.BlockAndLeakyGet()
// the future is timeout by the context after 200ms, but the underlying task doesn't handle the cancel, the future will return after 2s.
assert.Greater(t, time.Since(start).Seconds(), 2.0)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 0, getCInt(result))
freeCInt(result)
}
// TestFutures test the future implementation.
func TestFutures(t *testing.T) {
// Test failed case, throw folly exception.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowStdException,
})
defer future.Release()
start := time.Now()
future.BlockUntilReady() // test block until ready too.
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// Test failed case, throw std exception.
future = createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowFollyException,
})
defer future.Release()
start = time.Now()
future.BlockUntilReady() // test block until ready too.
result, err = future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyOtherException)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
// Test failed case, throw std exception.
future = createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowSegcoreException,
})
defer future.Release()
start = time.Now()
future.BlockUntilReady() // test block until ready too.
result, err = future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcorePretendFinished)
assert.Nil(t, result)
// The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds.
assert.Greater(t, time.Since(start).Seconds(), 0.5)
// free the result after used.
// Test cancellation.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
// canceled before the future(2s) is ready.
go func() {
time.Sleep(200 * time.Millisecond)
cancel()
}()
start = time.Now()
result, err = future.BlockAndLeakyGet()
// the future is canceled by the context after 200ms, so the future should be done in 1s but not 2s.
assert.Less(t, time.Since(start).Seconds(), 1.0)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.Canceled))
assert.Nil(t, result)
// Test cancellation.
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future = createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
start = time.Now()
result, err = future.BlockAndLeakyGet()
// the future is timeout by the context after 200ms, so the future should be done in 1s but not 2s.
assert.Less(t, time.Since(start).Seconds(), 1.0)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Nil(t, result)
runtime.GC()
assert.Eventually(t, func() bool {
return unreleasedCnt.Load() == 0
}, time.Second, time.Millisecond*100)
}
func TestConcurrent(t *testing.T) {
// Test is compatible with old implementation of fast fail future.
// So it's complicated and not easy to understand.
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(4)
// success case
go func() {
defer wg.Done()
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: 100,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 100, getCInt(result))
freeCInt(result)
}()
// fail case
go func() {
defer wg.Done()
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoThrowStdException,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.Nil(t, result)
}()
// timeout case
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future := createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 20,
caseNo: 100,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
assert.Nil(t, result)
}()
// no interrupt with timeout case
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
future := createFutureWithTestCase(ctx, testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: caseNoNoInterrupt,
})
defer future.Release()
result, err := future.BlockAndLeakyGet()
assert.NoError(t, err)
assert.Equal(t, 0, getCInt(result))
freeCInt(result)
}()
}
wg.Wait()
assert.Eventually(t, func() bool {
stat := futureManager.Stat()
fmt.Printf("active count: %d\n", stat.ActiveCount)
return stat.ActiveCount == 0
}, 5*time.Second, 100*time.Millisecond)
runtime.GC()
assert.Eventually(t, func() bool {
return unreleasedCnt.Load() == 0
}, time.Second, time.Millisecond*100)
}

View File

@ -0,0 +1,57 @@
//go:build test
// +build test
package cgo
/*
#cgo pkg-config: milvus_futures
#include "futures/future_c.h"
#include <stdlib.h>
*/
import "C"
import (
"context"
"time"
"unsafe"
"go.uber.org/atomic"
)
const (
caseNoNoInterrupt int = 0
caseNoThrowStdException int = 1
caseNoThrowFollyException int = 2
caseNoThrowSegcoreException int = 3
)
var unreleasedCnt = atomic.NewInt32(0)
type testCase struct {
interval time.Duration
loopCnt int
caseNo int
}
func createFutureWithTestCase(ctx context.Context, testCase testCase) Future {
f := func() CFuturePtr {
return (CFuturePtr)(C.future_create_test_case(C.int(testCase.interval.Milliseconds()), C.int(testCase.loopCnt), C.int(testCase.caseNo)))
}
future := Async(ctx, f,
WithName("createFutureWithTestCase"),
WithReleaser(func() {
unreleasedCnt.Dec()
}))
unreleasedCnt.Inc()
return future
}
func getCInt(p unsafe.Pointer) int {
return int(*(*C.int)(p))
}
func freeCInt(p unsafe.Pointer) {
C.free(p)
}

View File

@ -0,0 +1,124 @@
package cgo
import (
"math"
"reflect"
"sync"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
const (
registerIndex = 0
maxSelectCase = 65535
defaultRegisterBuf = 1
)
var (
futureManager *activeFutureManager
initOnce sync.Once
)
// InitCGO initializes the cgo caller and future manager.
// Please call this function before using any cgo utilities.
func InitCGO() {
initOnce.Do(func() {
nodeID := paramtable.GetStringNodeID()
chSize := int64(math.Ceil(float64(hardware.GetCPUNum()) * paramtable.Get().QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat()))
if chSize <= 0 {
chSize = 1
}
caller = &cgoCaller{
// TODO: temporary solution, need to find a better way to set the pool size.
ch: make(chan struct{}, chSize),
nodeID: nodeID,
}
futureManager = newActiveFutureManager(nodeID)
futureManager.Run()
})
}
type futureManagerStat struct {
ActiveCount int64
}
func newActiveFutureManager(nodeID string) *activeFutureManager {
manager := &activeFutureManager{
activeCount: atomic.NewInt64(0),
activeFutures: make([]basicFuture, 0),
cases: make([]reflect.SelectCase, 1),
register: make(chan basicFuture, defaultRegisterBuf),
nodeID: nodeID,
}
manager.cases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(manager.register),
}
return manager
}
// activeFutureManager manages the active futures.
// it will transfer the cancel signal into cgo.
type activeFutureManager struct {
activeCount *atomic.Int64
activeFutures []basicFuture
cases []reflect.SelectCase
register chan basicFuture
nodeID string
}
// Run starts the active future manager.
func (m *activeFutureManager) Run() {
go func() {
for {
m.doSelect()
}
}()
}
// Register registers a future when it's created into the manager.
func (m *activeFutureManager) Register(c basicFuture) {
m.register <- c
}
// Stat returns the stat of the manager, only for testing now.
func (m *activeFutureManager) Stat() futureManagerStat {
return futureManagerStat{
ActiveCount: m.activeCount.Load(),
}
}
// doSelect selects the active futures and cancel the finished ones.
func (m *activeFutureManager) doSelect() {
index, newCancelableObject, _ := reflect.Select(m.getSelectableCases())
if index == registerIndex {
newCancelable := newCancelableObject.Interface().(basicFuture)
m.cases = append(m.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(newCancelable.Context().Done()),
})
m.activeFutures = append(m.activeFutures, newCancelable)
} else {
m.cases = append(m.cases[:index], m.cases[index+1:]...)
offset := index - 1
// cancel the future and move it into gc manager.
m.activeFutures[offset].cancel(m.activeFutures[offset].Context().Err())
m.activeFutures = append(m.activeFutures[:offset], m.activeFutures[offset+1:]...)
}
activeTotal := len(m.activeFutures)
m.activeCount.Store(int64(activeTotal))
metrics.ActiveFutureTotal.WithLabelValues(
m.nodeID,
).Set(float64(activeTotal))
}
func (m *activeFutureManager) getSelectableCases() []reflect.SelectCase {
if len(m.cases) <= maxSelectCase {
return m.cases
}
return m.cases[0:maxSelectCase]
}

View File

@ -0,0 +1,32 @@
package cgo
func getDefaultOpt() *options {
return &options{
name: "unknown",
releaser: nil,
}
}
type options struct {
name string
releaser func()
}
// Opt is the option type for future.
type Opt func(*options)
// WithReleaser sets the releaser function.
// When a future is ready, the releaser function will be called once.
func WithReleaser(releaser func()) Opt {
return func(o *options) {
o.releaser = releaser
}
}
// WithName sets the name of the future.
// Only used for metrics.
func WithName(name string) Opt {
return func(o *options) {
o.name = name
}
}

42
internal/util/cgo/pool.go Normal file
View File

@ -0,0 +1,42 @@
package cgo
import (
"runtime"
"time"
"github.com/milvus-io/milvus/pkg/metrics"
)
var caller *cgoCaller
// getCGOCaller returns the cgoCaller instance.
func getCGOCaller() *cgoCaller {
return caller
}
// cgoCaller is a limiter to restrict the number of concurrent cgo calls.
type cgoCaller struct {
ch chan struct{}
nodeID string
}
// call calls the work function with a lock to restrict the number of concurrent cgo calls.
// it collect some metrics too.
func (c *cgoCaller) call(name string, work func()) {
start := time.Now()
c.ch <- struct{}{}
queueTime := time.Since(start)
metrics.CGOQueueDuration.WithLabelValues(c.nodeID).Observe(queueTime.Seconds())
runtime.LockOSThread()
defer func() {
runtime.UnlockOSThread()
<-c.ch
metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Dec()
total := time.Since(start) - queueTime
metrics.CGODuration.WithLabelValues(c.nodeID, name).Observe(total.Seconds())
}()
metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Inc()
work()
}

View File

@ -0,0 +1,38 @@
package cgo
import "go.uber.org/atomic"
const (
stateUnready int32 = iota
stateReady
stateConsumed
)
// newFutureState creates a new futureState.
func newFutureState() futureState {
return futureState{
inner: atomic.NewInt32(stateUnready),
}
}
// futureState is a state machine for future.
// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed
type futureState struct {
inner *atomic.Int32
}
// intoReady sets the state to ready.
func (s *futureState) intoReady() {
s.inner.CompareAndSwap(stateUnready, stateReady)
}
// intoConsumed sets the state to consumed.
// if the state is not ready, it does nothing and returns false.
func (s *futureState) intoConsumed() bool {
return s.inner.CompareAndSwap(stateReady, stateConsumed)
}
// checkUnready checks if the state is unready.
func (s *futureState) checkUnready() bool {
return s.inner.Load() == stateUnready
}

View File

@ -0,0 +1,66 @@
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
var (
subsystemCGO = "cgo"
cgoLabelName = "name"
once sync.Once
ActiveFutureTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "active_future_total",
Help: "Total number of active futures.",
}, []string{
nodeIDLabelName,
},
)
RunningCgoCallTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "running_cgo_call_total",
Help: "Total number of running cgo calls.",
}, []string{
nodeIDLabelName,
})
CGODuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "cgo_duration_seconds",
Help: "Histogram of cgo call duration in seconds.",
}, []string{
nodeIDLabelName,
cgoLabelName,
},
)
CGOQueueDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: subsystemCGO,
Name: "cgo_queue_duration_seconds",
Help: "Duration of cgo call in queue.",
}, []string{
nodeIDLabelName,
},
)
)
// RegisterCGOMetrics registers the cgo metrics.
func RegisterCGOMetrics(registry *prometheus.Registry) {
once.Do(func() {
prometheus.MustRegister(RunningCgoCallTotal)
prometheus.MustRegister(CGODuration)
prometheus.MustRegister(CGOQueueDuration)
})
}

View File

@ -157,9 +157,11 @@ var (
ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false)
// Segcore related
ErrSegcore = newMilvusError("segcore error", 2000, false)
ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false)
ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false)
ErrSegcore = newMilvusError("segcore error", 2000, false)
ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false)
ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false)
ErrSegcoreFollyOtherException = newMilvusError("segcore folly other exception", 2200, false) // throw from segcore.
ErrSegcoreFollyCancel = newMilvusError("segcore Future was canceled", 2201, false) // throw from segcore.
// Do NOT export this,
// never allow programmer using this, keep only for converting unknown error to milvusError

View File

@ -107,6 +107,7 @@ go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failf
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/cgo/..." -failfast -count=1 -ldflags="-r ${RPATH}"
}
function test_pkg()