From a1a0a56f86fe75cf8b6b97c46ccefc68a92f37f7 Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 4 Jul 2024 13:02:09 +0800 Subject: [PATCH] enhance: async search and retrieve in cgo (#34200) issue: #33132 pr: #33133 other pr: #33228, #34084, #33946 - implement future-based cgo utility - async search and retrieve in cgo - modify gc configuration document --------- Signed-off-by: chyezh --- .golangci.yml | 3 + Makefile | 5 +- configs/milvus.yaml | 8 +- internal/core/CMakeLists.txt | 5 + internal/core/src/CMakeLists.txt | 1 + internal/core/src/common/EasyAssert.h | 12 +- internal/core/src/futures/CMakeLists.txt | 24 ++ internal/core/src/futures/Executor.cpp | 29 ++ internal/core/src/futures/Executor.h | 30 ++ internal/core/src/futures/Future.h | 228 +++++++++++++++ internal/core/src/futures/LeakyResult.h | 112 ++++++++ internal/core/src/futures/Ready.h | 97 +++++++ internal/core/src/futures/future_c.cpp | 60 ++++ internal/core/src/futures/future_c.h | 47 +++ internal/core/src/futures/future_c_types.h | 26 ++ .../core/src/futures/future_test_case_c.cpp | 42 +++ .../core/src/futures/milvus_futures.pc.in | 9 + internal/core/src/segcore/CMakeLists.txt | 2 +- internal/core/src/segcore/segment_c.cpp | 192 +++++++------ internal/core/src/segcore/segment_c.h | 42 ++- internal/core/unittest/CMakeLists.txt | 1 + internal/core/unittest/init_gtest.cpp | 3 + internal/core/unittest/test_c_api.cpp | 172 +++++++---- internal/core/unittest/test_futures.cpp | 211 ++++++++++++++ internal/core/unittest/test_group_by.cpp | 4 +- .../unittest/test_utils/c_api_test_utils.h | 21 +- internal/querynodev2/segments/cgo_util.go | 12 +- .../querynodev2/segments/retrieve_test.go | 8 + internal/querynodev2/segments/segment.go | 176 ++++++------ internal/util/cgo/errors.go | 27 ++ internal/util/cgo/executor.go | 36 +++ internal/util/cgo/futures.go | 192 +++++++++++++ internal/util/cgo/futures_test.go | 272 ++++++++++++++++++ internal/util/cgo/futures_test_case.go | 48 ++++ internal/util/cgo/manager_active.go | 114 ++++++++ internal/util/cgo/options.go | 32 +++ internal/util/cgo/pool.go | 56 ++++ internal/util/cgo/state.go | 38 +++ pkg/metrics/cgo_metrics.go | 86 ++++++ pkg/metrics/metrics_test.go | 1 + pkg/metrics/querynode_metrics.go | 2 + pkg/util/merr/errors.go | 8 +- pkg/util/paramtable/component_param.go | 8 +- scripts/run_go_unittest.sh | 1 + 44 files changed, 2222 insertions(+), 281 deletions(-) create mode 100644 internal/core/src/futures/CMakeLists.txt create mode 100644 internal/core/src/futures/Executor.cpp create mode 100644 internal/core/src/futures/Executor.h create mode 100644 internal/core/src/futures/Future.h create mode 100644 internal/core/src/futures/LeakyResult.h create mode 100644 internal/core/src/futures/Ready.h create mode 100644 internal/core/src/futures/future_c.cpp create mode 100644 internal/core/src/futures/future_c.h create mode 100644 internal/core/src/futures/future_c_types.h create mode 100644 internal/core/src/futures/future_test_case_c.cpp create mode 100644 internal/core/src/futures/milvus_futures.pc.in create mode 100644 internal/core/unittest/test_futures.cpp create mode 100644 internal/util/cgo/errors.go create mode 100644 internal/util/cgo/executor.go create mode 100644 internal/util/cgo/futures.go create mode 100644 internal/util/cgo/futures_test.go create mode 100644 internal/util/cgo/futures_test_case.go create mode 100644 internal/util/cgo/manager_active.go create mode 100644 internal/util/cgo/options.go create mode 100644 internal/util/cgo/pool.go create mode 100644 internal/util/cgo/state.go create mode 100644 pkg/metrics/cgo_metrics.go diff --git a/.golangci.yml b/.golangci.yml index 0d343d6e67..c2d16b88aa 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,6 +13,9 @@ run: - ci skip-files: - partial_search_test.go + build-tags: + - dynamic + - test linters: disable-all: true diff --git a/Makefile b/Makefile index e35b7609e7..8adf108e88 100644 --- a/Makefile +++ b/Makefile @@ -155,8 +155,9 @@ lint-fix: getdeps #TODO: Check code specifications by golangci-lint static-check: getdeps @echo "Running $@ check" - @source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml - @source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml + @source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml + @source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml + @source $(PWD)/scripts/setenv.sh && cd client && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/client/.golangci.yml verifiers: build-cpp getdeps cppcheck fmt static-check diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0e2b2cccf0..a30121a78e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -489,11 +489,11 @@ dataCoord: deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 enableGarbageCollection: true gc: - interval: 3600 # gc interval in seconds - missingTolerance: 86400 # file meta missing tolerance duration in seconds, default to 24hr(1d) - dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 3600 + interval: 3600 # meta-based gc scanning interval in seconds + missingTolerance: 86400 # orphan file gc tolerance duration in seconds (orphan file which last modified time before the tolerance interval ago will be deleted) + dropTolerance: 10800 # meta-based gc tolerace duration in seconds (file which meta is marked as dropped before the tolerace interval ago will be deleted) removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects - scanInterval: 168 # garbage collection scan residue interval in hours + scanInterval: 168 # orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours enableActiveStandby: false brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout autoBalance: true # Enable auto balance diff --git a/internal/core/CMakeLists.txt b/internal/core/CMakeLists.txt index 407220304a..170fce961a 100644 --- a/internal/core/CMakeLists.txt +++ b/internal/core/CMakeLists.txt @@ -305,6 +305,11 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/ FILES_MATCHING PATTERN "*_c.h" ) +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/futures/ + DESTINATION include/futures + FILES_MATCHING PATTERN "*.h" +) + install(DIRECTORY ${CMAKE_BINARY_DIR}/lib/ DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR} ) diff --git a/internal/core/src/CMakeLists.txt b/internal/core/src/CMakeLists.txt index c6da67afb1..cb6bd68be8 100644 --- a/internal/core/src/CMakeLists.txt +++ b/internal/core/src/CMakeLists.txt @@ -35,3 +35,4 @@ add_subdirectory( indexbuilder ) add_subdirectory( clustering ) add_subdirectory( exec ) add_subdirectory( bitset ) +add_subdirectory( futures ) diff --git a/internal/core/src/common/EasyAssert.h b/internal/core/src/common/EasyAssert.h index a7ca2f3e13..382f1a6377 100644 --- a/internal/core/src/common/EasyAssert.h +++ b/internal/core/src/common/EasyAssert.h @@ -64,8 +64,10 @@ enum ErrorCode { MemAllocateFailed = 2034, MemAllocateSizeNotMatch = 2035, MmapError = 2036, + // timeout or cancel related. + FollyOtherException = 2037, + FollyCancel = 2038, KnowhereError = 2100, - }; namespace impl { void @@ -90,7 +92,7 @@ class SegcoreError : public std::runtime_error { } ErrorCode - get_error_code() { + get_error_code() const { return error_code_; } @@ -114,9 +116,9 @@ FailureCStatus(int code, const std::string& msg) { } inline CStatus -FailureCStatus(std::exception* ex) { - if (dynamic_cast(ex) != nullptr) { - auto segcore_error = dynamic_cast(ex); +FailureCStatus(const std::exception* ex) { + if (dynamic_cast(ex) != nullptr) { + auto segcore_error = dynamic_cast(ex); return CStatus{static_cast(segcore_error->get_error_code()), strdup(ex->what())}; } diff --git a/internal/core/src/futures/CMakeLists.txt b/internal/core/src/futures/CMakeLists.txt new file mode 100644 index 0000000000..59d4bdd9f2 --- /dev/null +++ b/internal/core/src/futures/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License + +milvus_add_pkg_config("milvus_futures") + +set(FUTURES_SRC + Executor.cpp + future_c.cpp + future_test_case_c.cpp + ) + +add_library(milvus_futures SHARED ${FUTURES_SRC}) + +target_link_libraries(milvus_futures milvus_common) + +install(TARGETS milvus_futures DESTINATION "${CMAKE_INSTALL_LIBDIR}") diff --git a/internal/core/src/futures/Executor.cpp b/internal/core/src/futures/Executor.cpp new file mode 100644 index 0000000000..b424809e0a --- /dev/null +++ b/internal/core/src/futures/Executor.cpp @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "Executor.h" +#include "common/Common.h" + +namespace milvus::futures { + +const int kNumPriority = 3; + +folly::CPUThreadPoolExecutor* +getGlobalCPUExecutor() { + static folly::CPUThreadPoolExecutor executor( + std::thread::hardware_concurrency(), + folly::CPUThreadPoolExecutor::makeDefaultPriorityQueue(kNumPriority), + std::make_shared("MILVUS_FUTURE_CPU_")); + return &executor; +} + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/Executor.h b/internal/core/src/futures/Executor.h new file mode 100644 index 0000000000..5adfe389b3 --- /dev/null +++ b/internal/core/src/futures/Executor.h @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include + +namespace milvus::futures { + +namespace ExecutePriority { +const int LOW = 2; +const int NORMAL = 1; +const int HIGH = 0; +} // namespace ExecutePriority + +folly::CPUThreadPoolExecutor* +getGlobalCPUExecutor(); + +}; // namespace milvus::futures diff --git a/internal/core/src/futures/Future.h b/internal/core/src/futures/Future.h new file mode 100644 index 0000000000..60eb804e96 --- /dev/null +++ b/internal/core/src/futures/Future.h @@ -0,0 +1,228 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include +#include "future_c_types.h" +#include "LeakyResult.h" +#include "Ready.h" + +namespace milvus::futures { + +/// @brief a virtual class that represents a future can be polymorphic called by CGO code. +/// implemented by Future template. +class IFuture { + public: + /// @brief cancel the future with the given exception. + /// After cancelled is called, the underlying async function will receive cancellation. + /// It just a signal notification, the cancellation is handled by user-defined. + /// If the underlying async function ignore the cancellation signal, the Future is still blocked. + virtual void + cancel() = 0; + + /// @brief check if the future is ready or canceled. + /// @return true if the future is ready or canceled, otherwise false. + virtual bool + isReady() = 0; + + /// @brief register a callback that will be called when the future is ready or future has been ready. + virtual void + registerReadyCallback(CUnlockGoMutexFn unlockFn, CLockedGoMutex* mutex) = 0; + + /// @brief get the result of the future. it must be called if future is ready. + /// the first element of the pair is the result, + /// the second element of the pair is the exception. + /// !!! It can only be called once, + /// and the result need to be manually released by caller after these call. + virtual std::pair + leakyGet() = 0; + + /// @brief leaked future object created by method `Future::createLeakedFuture` can be droped by these method. + static void + releaseLeakedFuture(IFuture* future) { + delete future; + } + + virtual ~IFuture() = default; +}; + +/// @brief a class that represents a cancellation token +class CancellationToken : public folly::CancellationToken { + public: + CancellationToken(folly::CancellationToken&& token) noexcept + : folly::CancellationToken(std::move(token)) { + } + + /// @brief check if the token is cancelled, throw a FutureCancellation exception if it is. + void + throwIfCancelled() const { + if (isCancellationRequested()) { + throw folly::FutureCancellation(); + } + } +}; + +/// @brief Future is a class that bound a future with a result for +/// using by cgo. +/// @tparam R is the return type of the producer function. +template +class Future : public IFuture { + public: + /// @brief do a async operation which will produce a result. + /// fn returns pointer to R (leaked, default memory allocator) if it is success, otherwise it will throw a exception. + /// returned result or exception will be handled by consumer side. + template >> + static std::unique_ptr> + async(folly::Executor::KeepAlive<> executor, + int priority, + Fn&& fn) noexcept { + auto future = std::make_unique>(); + // setup the interrupt handler for the promise. + future->setInterruptHandler(); + // start async function. + future->asyncProduce(executor, priority, std::forward(fn)); + // register consume callback function. + future->registerConsumeCallback(executor, priority); + return future; + } + + /// use `async`. + Future() + : ready_(std::make_shared>>()), + promise_(std::make_shared>()), + cancellation_source_() { + } + + Future(const Future&) = delete; + + Future(Future&&) noexcept = default; + + Future& + operator=(const Future&) = delete; + + Future& + operator=(Future&&) noexcept = default; + + /// @brief see `IFuture::cancel` + void + cancel() noexcept override { + promise_->getSemiFuture().cancel(); + } + + /// @brief see `IFuture::registerReadyCallback` + void + registerReadyCallback(CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex) noexcept override { + ready_->callOrRegisterCallback( + [unlockFn = unlockFn, mutex = mutex]() { unlockFn(mutex); }); + } + + /// @brief see `IFuture::isReady` + bool + isReady() noexcept override { + return ready_->isReady(); + } + + /// @brief see `IFuture::leakyGet` + std::pair + leakyGet() noexcept override { + auto result = std::move(*ready_).getValue(); + return result.leakyGet(); + } + + private: + /// @brief set the interrupt handler for the promise used in async produce arm. + void + setInterruptHandler() { + promise_->setInterruptHandler([cancellation_source = + cancellation_source_, + ready = ready_]( + const folly::exception_wrapper& ew) { + // 1. set the result to perform a fast fail. + // 2. set the cancellation to the source to notify cancellation to the consumers. + ew.handle( + [&](const folly::FutureCancellation& e) { + cancellation_source.requestCancellation(); + }, + [&](const folly::FutureTimeout& e) { + cancellation_source.requestCancellation(); + }); + }); + } + + /// @brief do the R produce operation in async way. + template >> + void + asyncProduce(folly::Executor::KeepAlive<> executor, int priority, Fn&& fn) { + // start produce process async. + auto cancellation_token = + CancellationToken(cancellation_source_.getToken()); + auto runner = [fn = std::forward(fn), + cancellation_token = std::move(cancellation_token)]() { + cancellation_token.throwIfCancelled(); + return fn(cancellation_token); + }; + + // the runner is executed may be executed in different thread. + // so manage the promise with shared_ptr. + auto thenRunner = [promise = promise_, runner = std::move(runner)]( + auto&&) { promise->setWith(std::move(runner)); }; + folly::makeSemiFuture().via(executor, priority).then(thenRunner); + } + + /// @brief async consume the result of the future. + void + registerConsumeCallback(folly::Executor::KeepAlive<> executor, + int priority) noexcept { + // set up the result consume arm and exception consume arm. + promise_->getSemiFuture() + .via(executor, priority) + .thenValue( + [ready = ready_](R* r) { ready->setValue(LeakyResult(r)); }) + .thenError(folly::tag_t{}, + [ready = ready_](const folly::FutureCancellation& e) { + ready->setValue( + LeakyResult(milvus::FollyCancel, e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const folly::FutureException& e) { + ready->setValue(LeakyResult( + milvus::FollyOtherException, e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const milvus::SegcoreError& e) { + ready->setValue(LeakyResult( + static_cast(e.get_error_code()), e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const std::exception& e) { + ready->setValue(LeakyResult( + milvus::UnexpectedError, e.what())); + }); + } + + private: + std::shared_ptr>> ready_; + std::shared_ptr> promise_; + folly::CancellationSource cancellation_source_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/LeakyResult.h b/internal/core/src/futures/LeakyResult.h new file mode 100644 index 0000000000..7fbbb990b4 --- /dev/null +++ b/internal/core/src/futures/LeakyResult.h @@ -0,0 +1,112 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace milvus::futures { + +/// @brief LeakyResult is a class that holds the result that can be leaked. +/// @tparam R is a type to real result that can be leak after get operation. +template +class LeakyResult { + public: + /// @brief default construct a empty Result, which is just used for easy contruction. + LeakyResult() { + } + + /// @brief create a LeakyResult with error code and error message which means failure. + /// @param error_code see CStatus difinition. + /// @param error_msg see CStatus difinition. + LeakyResult(int error_code, const std::string& error_msg) { + auto msg = strdup(error_msg.c_str()); + status_ = std::make_optional(CStatus{error_code, msg}); + } + + /// @brief create a LeakyResult with a result which means success. + /// @param r + LeakyResult(R* r) : result_(std::make_optional(r)) { + } + + LeakyResult(const LeakyResult&) = delete; + + LeakyResult(LeakyResult&& other) noexcept { + if (other.result_.has_value()) { + result_ = std::move(other.result_); + other.result_.reset(); + } + if (other.status_.has_value()) { + status_ = std::move(other.status_); + other.status_.reset(); + } + } + + LeakyResult& + operator=(const LeakyResult&) = delete; + + LeakyResult& + operator=(LeakyResult&& other) noexcept { + if (this != &other) { + if (other.result_.has_value()) { + result_ = std::move(other.result_); + other.result_.reset(); + } + if (other.status_.has_value()) { + status_ = std::move(other.status_); + other.status_.reset(); + } + } + return *this; + } + + /// @brief get the Result or CStatus from LeakyResult, performed a manual memory management. + /// caller has responsibitiy to release if void* is not nullptr or cstatus is not nullptr. + /// @return a pair of void* and CStatus is returned, void* => R*. + /// condition (void* == nullptr and CStatus is failure) or (void* != nullptr and CStatus is success) is met. + /// release operation of CStatus see common/type_c.h. + std::pair + leakyGet() { + if (result_.has_value()) { + R* result_ptr = result_.value(); + result_.reset(); + return std::make_pair(result_ptr, + CStatus{0, nullptr}); + } + if (status_.has_value()) { + CStatus status = status_.value(); + status_.reset(); + return std::make_pair( + nullptr, CStatus{status.error_code, status.error_msg}); + } + throw std::logic_error("get on a not ready LeakyResult"); + } + + ~LeakyResult() { + if (result_.has_value()) { + delete result_.value(); + } + if (status_.has_value()) { + free((char*)(status_.value().error_msg)); + } + } + + private: + std::optional status_; + std::optional result_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/Ready.h b/internal/core/src/futures/Ready.h new file mode 100644 index 0000000000..566b2d7857 --- /dev/null +++ b/internal/core/src/futures/Ready.h @@ -0,0 +1,97 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include + +namespace milvus::futures { + +/// @brief Ready is a class that holds a value of type T. +/// value of Ready can be only set into ready by once, +/// and allows to register callbacks to be called when the value is ready. +template +class Ready { + public: + Ready() : is_ready_(false){}; + + Ready(const Ready&) = delete; + + Ready(Ready&&) noexcept = default; + + Ready& + operator=(const Ready&) = delete; + + Ready& + operator=(Ready&&) noexcept = default; + + /// @brief set the value into Ready. + void + setValue(T&& value) { + mutex_.lock(); + value_ = std::move(value); + is_ready_ = true; + std::vector> callbacks(std::move(callbacks_)); + mutex_.unlock(); + + // perform all callbacks which is registered before value is ready. + for (auto& callback : callbacks) { + callback(); + } + } + + /// @brief get the value from Ready. + /// @return ready value. + T + getValue() && { + std::lock_guard lock(mutex_); + if (!is_ready_) { + throw std::runtime_error("Value is not ready"); + } + auto v(std::move(value_.value())); + value_.reset(); + return std::move(v); + } + + /// @brief check if the value is ready. + bool + isReady() const { + const std::lock_guard lock(mutex_); + return is_ready_; + } + + /// @brief register a callback into Ready if value is not ready, otherwise call it directly. + template >> + void + callOrRegisterCallback(Fn&& fn) { + mutex_.lock(); + // call if value is ready, + // otherwise register as a callback to be called when value is ready. + if (is_ready_) { + mutex_.unlock(); + fn(); + return; + } + callbacks_.push_back(std::forward(fn)); + mutex_.unlock(); + } + + private: + std::optional value_; + mutable std::mutex mutex_; + std::vector> callbacks_; + bool is_ready_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/future_c.cpp b/internal/core/src/futures/future_c.cpp new file mode 100644 index 0000000000..1221d2d653 --- /dev/null +++ b/internal/core/src/futures/future_c.cpp @@ -0,0 +1,60 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include + +#include "future_c.h" +#include "folly/init/Init.h" +#include "Future.h" +#include "Executor.h" +#include "log/Log.h" + +extern "C" void +future_cancel(CFuture* future) { + static_cast(static_cast(future)) + ->cancel(); +} + +extern "C" bool +future_is_ready(CFuture* future) { + return static_cast(static_cast(future)) + ->isReady(); +} + +extern "C" void +future_register_ready_callback(CFuture* future, + CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex) { + static_cast(static_cast(future)) + ->registerReadyCallback(unlockFn, mutex); +} + +extern "C" CStatus +future_leak_and_get(CFuture* future, void** result) { + auto [r, s] = + static_cast(static_cast(future)) + ->leakyGet(); + *result = r; + return s; +} + +extern "C" void +future_destroy(CFuture* future) { + milvus::futures::IFuture::releaseLeakedFuture( + static_cast(static_cast(future))); +} + +extern "C" void +executor_set_thread_num(int thread_num) { + milvus::futures::getGlobalCPUExecutor()->setNumThreads(thread_num); + LOG_INFO("future executor setup cpu executor with thread num: {}", + thread_num); +} diff --git a/internal/core/src/futures/future_c.h b/internal/core/src/futures/future_c.h new file mode 100644 index 0000000000..539f22eff1 --- /dev/null +++ b/internal/core/src/futures/future_c.h @@ -0,0 +1,47 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include "future_c_types.h" +#include "common/type_c.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void +future_cancel(CFuture* future); + +bool +future_is_ready(CFuture* future); + +void +future_register_ready_callback(CFuture* future, + CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex); + +CStatus +future_leak_and_get(CFuture* future, void** result); + +// TODO: only for testing, add test macro for this function. +CFuture* +future_create_test_case(int interval, int loop_cnt, int caseNo); + +void +future_destroy(CFuture* future); + +void +executor_set_thread_num(int thread_num); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/futures/future_c_types.h b/internal/core/src/futures/future_c_types.h new file mode 100644 index 0000000000..036d71c008 --- /dev/null +++ b/internal/core/src/futures/future_c_types.h @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct CFuture CFuture; + +typedef struct CLockedGoMutex CLockedGoMutex; + +typedef void (*CUnlockGoMutexFn)(CLockedGoMutex* mutex); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/futures/future_test_case_c.cpp b/internal/core/src/futures/future_test_case_c.cpp new file mode 100644 index 0000000000..a100265665 --- /dev/null +++ b/internal/core/src/futures/future_test_case_c.cpp @@ -0,0 +1,42 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "Future.h" +#include "Executor.h" + +extern "C" CFuture* +future_create_test_case(int interval, int loop_cnt, int case_no) { + auto future = milvus::futures::Future::async( + milvus::futures::getGlobalCPUExecutor(), + milvus::futures::ExecutePriority::HIGH, + [interval = interval, loop_cnt = loop_cnt, case_no = case_no]( + milvus::futures::CancellationToken token) { + for (int i = 0; i < loop_cnt; i++) { + if (case_no != 0) { + token.throwIfCancelled(); + } + std::this_thread::sleep_for( + std::chrono::milliseconds(interval)); + } + switch (case_no) { + case 1: + throw std::runtime_error("case 1"); + case 2: + throw folly::FutureNoExecutor(); + case 3: + throw milvus::SegcoreError(milvus::UnexpectedError, + "case 3"); + } + return new int(case_no); + }); + return static_cast(static_cast( + static_cast(future.release()))); +} diff --git a/internal/core/src/futures/milvus_futures.pc.in b/internal/core/src/futures/milvus_futures.pc.in new file mode 100644 index 0000000000..dc75e325e8 --- /dev/null +++ b/internal/core/src/futures/milvus_futures.pc.in @@ -0,0 +1,9 @@ +libdir=@CMAKE_INSTALL_FULL_LIBDIR@ +includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@ + +Name: Milvus Futures +Description: Futures modules for Milvus +Version: @MILVUS_VERSION@ + +Libs: -L${libdir} -lmilvus_futures +Cflags: -I${includedir} diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index a386e83c86..7d7944eda9 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -42,6 +42,6 @@ set(SEGCORE_FILES check_vec_index_c.cpp) add_library(milvus_segcore SHARED ${SEGCORE_FILES}) -target_link_libraries(milvus_segcore milvus_query milvus_bitset milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage) +target_link_libraries(milvus_segcore milvus_query milvus_bitset milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage milvus_futures) install(TARGETS milvus_segcore DESTINATION "${CMAKE_INSTALL_LIBDIR}") diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 06643ea3f7..e662c22181 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -27,6 +27,8 @@ #include "segcore/SegmentSealedImpl.h" #include "segcore/Utils.h" #include "storage/Util.h" +#include "futures/Future.h" +#include "futures/Executor.h" #include "storage/space.h" ////////////////////////////// common interfaces ////////////////////////////// @@ -82,113 +84,129 @@ DeleteSearchResult(CSearchResult search_result) { delete res; } -CStatus -Search(CTraceContext c_trace, - CSegmentInterface c_segment, - CSearchPlan c_plan, - CPlaceholderGroup c_placeholder_group, - uint64_t timestamp, - CSearchResult* result) { - try { - auto segment = (milvus::segcore::SegmentInterface*)c_segment; - auto plan = (milvus::query::Plan*)c_plan; - auto phg_ptr = reinterpret_cast( - c_placeholder_group); +CFuture* // Future +AsyncSearch(CTraceContext c_trace, + CSegmentInterface c_segment, + CSearchPlan c_plan, + CPlaceholderGroup c_placeholder_group, + uint64_t timestamp) { + auto segment = (milvus::segcore::SegmentInterface*)c_segment; + auto plan = (milvus::query::Plan*)c_plan; + auto phg_ptr = reinterpret_cast( + c_placeholder_group); - // save trace context into search_info - auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_; - trace_ctx.traceID = c_trace.traceID; - trace_ctx.spanID = c_trace.spanID; - trace_ctx.traceFlags = c_trace.traceFlags; + auto future = milvus::futures::Future::async( + milvus::futures::getGlobalCPUExecutor(), + milvus::futures::ExecutePriority::HIGH, + [c_trace, segment, plan, phg_ptr, timestamp]( + milvus::futures::CancellationToken cancel_token) { + // save trace context into search_info + auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_; + trace_ctx.traceID = c_trace.traceID; + trace_ctx.spanID = c_trace.spanID; + trace_ctx.traceFlags = c_trace.traceFlags; - auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx); - milvus::tracer::SetRootSpan(span); + auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx); + milvus::tracer::SetRootSpan(span); - auto search_result = segment->Search(plan, phg_ptr, timestamp); - if (!milvus::PositivelyRelated( - plan->plan_node_->search_info_.metric_type_)) { - for (auto& dis : search_result->distances_) { - dis *= -1; + auto search_result = segment->Search(plan, phg_ptr, timestamp); + if (!milvus::PositivelyRelated( + plan->plan_node_->search_info_.metric_type_)) { + for (auto& dis : search_result->distances_) { + dis *= -1; + } } - } - *result = search_result.release(); - span->End(); - milvus::tracer::CloseRootSpan(); - return milvus::SuccessCStatus(); - } catch (std::exception& e) { - return milvus::FailureCStatus(&e); - } + span->End(); + milvus::tracer::CloseRootSpan(); + return search_result.release(); + }); + return static_cast(static_cast( + static_cast(future.release()))); } void DeleteRetrieveResult(CRetrieveResult* retrieve_result) { - std::free(const_cast(retrieve_result->proto_blob)); + delete[] static_cast( + const_cast(retrieve_result->proto_blob)); + delete retrieve_result; } -CStatus -Retrieve(CTraceContext c_trace, - CSegmentInterface c_segment, - CRetrievePlan c_plan, - uint64_t timestamp, - CRetrieveResult* result, - int64_t limit_size, - bool ignore_non_pk) { +/// Create a leaked CRetrieveResult from a proto. +/// Should be released by DeleteRetrieveResult. +CRetrieveResult* +CreateLeakedCRetrieveResultFromProto( + std::unique_ptr retrieve_result) { + auto size = retrieve_result->ByteSizeLong(); + auto buffer = new uint8_t[size]; try { - auto segment = - static_cast(c_segment); - auto plan = static_cast(c_plan); - - auto trace_ctx = milvus::tracer::TraceContext{ - c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; - milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true); - - auto retrieve_result = segment->Retrieve( - &trace_ctx, plan, timestamp, limit_size, ignore_non_pk); - - auto size = retrieve_result->ByteSizeLong(); - std::unique_ptr buffer(new uint8_t[size]); - retrieve_result->SerializePartialToArray(buffer.get(), size); - - result->proto_blob = buffer.release(); - result->proto_size = size; - - return milvus::SuccessCStatus(); + retrieve_result->SerializePartialToArray(buffer, size); } catch (std::exception& e) { - return milvus::FailureCStatus(&e); + delete[] buffer; + throw; } + + auto result = new CRetrieveResult(); + result->proto_blob = buffer; + result->proto_size = size; + return result; } -CStatus -RetrieveByOffsets(CTraceContext c_trace, - CSegmentInterface c_segment, - CRetrievePlan c_plan, - CRetrieveResult* result, - int64_t* offsets, - int64_t len) { - try { - auto segment = - static_cast(c_segment); - auto plan = static_cast(c_plan); +CFuture* // Future +AsyncRetrieve(CTraceContext c_trace, + CSegmentInterface c_segment, + CRetrievePlan c_plan, + uint64_t timestamp, + int64_t limit_size, + bool ignore_non_pk) { + auto segment = static_cast(c_segment); + auto plan = static_cast(c_plan); - auto trace_ctx = milvus::tracer::TraceContext{ - c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; - milvus::tracer::AutoSpan span( - "SegCoreRetrieveByOffsets", &trace_ctx, true); + auto future = milvus::futures::Future::async( + milvus::futures::getGlobalCPUExecutor(), + milvus::futures::ExecutePriority::HIGH, + [c_trace, segment, plan, timestamp, limit_size, ignore_non_pk]( + milvus::futures::CancellationToken cancel_token) { + auto trace_ctx = milvus::tracer::TraceContext{ + c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; + milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true); - auto retrieve_result = - segment->Retrieve(&trace_ctx, plan, offsets, len); + auto retrieve_result = segment->Retrieve( + &trace_ctx, plan, timestamp, limit_size, ignore_non_pk); - auto size = retrieve_result->ByteSizeLong(); - std::unique_ptr buffer(new uint8_t[size]); - retrieve_result->SerializePartialToArray(buffer.get(), size); + return CreateLeakedCRetrieveResultFromProto( + std::move(retrieve_result)); + }); + return static_cast(static_cast( + static_cast(future.release()))); +} - result->proto_blob = buffer.release(); - result->proto_size = size; +CFuture* // Future +AsyncRetrieveByOffsets(CTraceContext c_trace, + CSegmentInterface c_segment, + CRetrievePlan c_plan, + int64_t* offsets, + int64_t len) { + auto segment = static_cast(c_segment); + auto plan = static_cast(c_plan); - return milvus::SuccessCStatus(); - } catch (std::exception& e) { - return milvus::FailureCStatus(&e); - } + auto future = milvus::futures::Future::async( + milvus::futures::getGlobalCPUExecutor(), + milvus::futures::ExecutePriority::HIGH, + [c_trace, segment, plan, offsets, len]( + milvus::futures::CancellationToken cancel_token) { + auto trace_ctx = milvus::tracer::TraceContext{ + c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; + milvus::tracer::AutoSpan span( + "SegCoreRetrieveByOffsets", &trace_ctx, true); + + auto retrieve_result = + segment->Retrieve(&trace_ctx, plan, offsets, len); + + return CreateLeakedCRetrieveResultFromProto( + std::move(retrieve_result)); + }); + return static_cast(static_cast( + static_cast(future.release()))); } int64_t diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index e971c86d5b..ec25518348 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -20,6 +20,7 @@ extern "C" { #include #include "common/type_c.h" +#include "futures/future_c.h" #include "segcore/plan_c.h" #include "segcore/load_index_c.h" #include "segcore/load_field_data_c.h" @@ -43,33 +44,30 @@ ClearSegmentData(CSegmentInterface c_segment); void DeleteSearchResult(CSearchResult search_result); -CStatus -Search(CTraceContext c_trace, - CSegmentInterface c_segment, - CSearchPlan c_plan, - CPlaceholderGroup c_placeholder_group, - uint64_t timestamp, - CSearchResult* result); +CFuture* // Future +AsyncSearch(CTraceContext c_trace, + CSegmentInterface c_segment, + CSearchPlan c_plan, + CPlaceholderGroup c_placeholder_group, + uint64_t timestamp); void DeleteRetrieveResult(CRetrieveResult* retrieve_result); -CStatus -Retrieve(CTraceContext c_trace, - CSegmentInterface c_segment, - CRetrievePlan c_plan, - uint64_t timestamp, - CRetrieveResult* result, - int64_t limit_size, - bool ignore_non_pk); +CFuture* // Future +AsyncRetrieve(CTraceContext c_trace, + CSegmentInterface c_segment, + CRetrievePlan c_plan, + uint64_t timestamp, + int64_t limit_size, + bool ignore_non_pk); -CStatus -RetrieveByOffsets(CTraceContext c_trace, - CSegmentInterface c_segment, - CRetrievePlan c_plan, - CRetrieveResult* result, - int64_t* offsets, - int64_t len); +CFuture* // Future +AsyncRetrieveByOffsets(CTraceContext c_trace, + CSegmentInterface c_segment, + CRetrievePlan c_plan, + int64_t* offsets, + int64_t len); int64_t GetMemoryUsageInBytes(CSegmentInterface c_segment); diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 191e3c5ccf..5801466191 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -69,6 +69,7 @@ set(MILVUS_TEST_FILES test_array_inverted_index.cpp test_chunk_vector.cpp test_mmap_chunk_manager.cpp + test_futures.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/init_gtest.cpp b/internal/core/unittest/init_gtest.cpp index 3633a86f82..adc1b3b683 100644 --- a/internal/core/unittest/init_gtest.cpp +++ b/internal/core/unittest/init_gtest.cpp @@ -11,6 +11,7 @@ #include +#include "folly/init/Init.h" #include "test_utils/Constants.h" #include "storage/LocalChunkManagerSingleton.h" #include "storage/RemoteChunkManagerSingleton.h" @@ -19,6 +20,8 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); + folly::Init follyInit(&argc, &argv, false); + milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( TestLocalPath); milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init( diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 0297c5e334..df6a8bf494 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -34,6 +34,7 @@ #include "segcore/Reduce.h" #include "segcore/reduce_c.h" #include "segcore/segment_c.h" +#include "futures/Future.h" #include "test_utils/DataGen.h" #include "test_utils/PbHelper.h" #include "test_utils/indexbuilder_test_utils.h" @@ -64,14 +65,50 @@ CStatus CRetrieve(CSegmentInterface c_segment, CRetrievePlan c_plan, uint64_t timestamp, - CRetrieveResult* result) { - return Retrieve({}, - c_segment, - c_plan, - timestamp, - result, - DEFAULT_MAX_OUTPUT_SIZE, - false); + CRetrieveResult** result) { + auto future = AsyncRetrieve( + {}, c_segment, c_plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE, false); + auto futurePtr = static_cast( + static_cast(static_cast(future))); + + std::mutex mu; + mu.lock(); + futurePtr->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + + auto [retrieveResult, status] = futurePtr->leakyGet(); + if (status.error_code != 0) { + return status; + } + *result = static_cast(retrieveResult); + return status; +} + +CStatus +CRetrieveByOffsets(CSegmentInterface c_segment, + CRetrievePlan c_plan, + int64_t* offsets, + int64_t len, + CRetrieveResult** result) { + auto future = AsyncRetrieveByOffsets({}, c_segment, c_plan, offsets, len); + auto futurePtr = static_cast( + static_cast(static_cast(future))); + + std::mutex mu; + mu.lock(); + futurePtr->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + + auto [retrieveResult, status] = futurePtr->leakyGet(); + if (status.error_code != 0) { + return status; + } + *result = static_cast(retrieveResult); + return status; } const char* @@ -609,15 +646,16 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { plan->field_ids_ = target_field_ids; auto max_ts = dataset.timestamps_[N - 1] + 10; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // retrieve pks = {2} { @@ -633,11 +671,12 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { std::make_shared(DEFAULT_PLANNODE_ID, term_expr); res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 1); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // delete pks = {2} delete_pks = {2}; @@ -658,13 +697,13 @@ TEST(CApiTest, MultiDeleteGrowingSegment) { // retrieve pks in {2} res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); DeleteCollection(collection); DeleteSegment(segment); @@ -721,15 +760,16 @@ TEST(CApiTest, MultiDeleteSealedSegment) { plan->field_ids_ = target_field_ids; auto max_ts = dataset.timestamps_[N - 1] + 10; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; auto res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // retrieve pks = {2} { @@ -745,11 +785,12 @@ TEST(CApiTest, MultiDeleteSealedSegment) { std::make_shared(DEFAULT_PLANNODE_ID, term_expr); res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 1); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // delete pks = {2} delete_pks = {2}; @@ -770,13 +811,13 @@ TEST(CApiTest, MultiDeleteSealedSegment) { // retrieve pks in {2} res = CRetrieve(segment, plan.get(), max_ts, &retrieve_result); ASSERT_EQ(res.error_code, Success); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); DeleteCollection(collection); DeleteSegment(segment); @@ -839,16 +880,17 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { std::vector target_field_ids{FieldId(100), FieldId(101)}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; res = CRetrieve( segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 3); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // delete data pks = {1, 2, 3} std::vector delete_row_ids = {1, 2, 3}; @@ -873,13 +915,14 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; DeleteCollection(collection); DeleteSegment(segment); @@ -920,16 +963,17 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { std::vector target_field_ids{FieldId(100), FieldId(101)}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; auto res = CRetrieve( segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 6); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // delete data pks = {1, 2, 3} std::vector delete_row_ids = {1, 2, 3}; @@ -955,13 +999,13 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); DeleteCollection(collection); DeleteSegment(segment); @@ -1030,16 +1074,17 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { std::vector target_field_ids{FieldId(100), FieldId(101)}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; res = CRetrieve( segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 0); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; // second insert data // insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19} @@ -1061,13 +1106,13 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { ASSERT_EQ(res.error_code, Success); query_result = std::make_unique(); - suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 3); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); DeleteCollection(collection); DeleteSegment(segment); @@ -1127,18 +1172,19 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { std::vector target_field_ids{FieldId(100), FieldId(101)}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; auto res = CRetrieve( segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->ids().int_id().data().size(), 4); DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + retrieve_result = nullptr; DeleteCollection(collection); DeleteSegment(segment); @@ -1324,13 +1370,21 @@ TEST(CApiTest, RetrieveTestWithExpr) { std::vector target_field_ids{FieldId(100), FieldId(101)}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; auto res = CRetrieve( segment, plan.get(), dataset.timestamps_[0], &retrieve_result); ASSERT_EQ(res.error_code, Success); + // Test Retrieve by offsets. + int64_t offsets[] = {0, 1, 2}; + CRetrieveResult* retrieve_by_offsets_result = nullptr; + res = CRetrieveByOffsets( + segment, plan.get(), offsets, 3, &retrieve_by_offsets_result); + ASSERT_EQ(res.error_code, Success); + DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); + DeleteRetrieveResult(retrieve_by_offsets_result); DeleteCollection(collection); DeleteSegment(segment); } @@ -4324,13 +4378,13 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) { i8_fid, i16_fid, i32_fid, i64_fid, float_fid, double_fid}; plan->field_ids_ = target_field_ids; - CRetrieveResult retrieve_result; + CRetrieveResult* retrieve_result = nullptr; res = CRetrieve( segment, plan.get(), raw_data.timestamps_[N - 1], &retrieve_result); ASSERT_EQ(res.error_code, Success); auto query_result = std::make_unique(); - auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, - retrieve_result.proto_size); + auto suc = query_result->ParseFromArray(retrieve_result->proto_blob, + retrieve_result->proto_size); ASSERT_TRUE(suc); ASSERT_EQ(query_result->fields_data().size(), 6); auto fields_data = query_result->fields_data(); @@ -4369,7 +4423,7 @@ TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) { } DeleteRetrievePlan(plan.release()); - DeleteRetrieveResult(&retrieve_result); + DeleteRetrieveResult(retrieve_result); DeleteSegment(segment); } diff --git a/internal/core/unittest/test_futures.cpp b/internal/core/unittest/test_futures.cpp new file mode 100644 index 0000000000..671cffc72a --- /dev/null +++ b/internal/core/unittest/test_futures.cpp @@ -0,0 +1,211 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "futures/Future.h" +#include +#include +#include +#include + +using namespace milvus::futures; + +TEST(Futures, LeakyResult) { + { + LeakyResult leaky_result; + ASSERT_ANY_THROW(leaky_result.leakyGet()); + } + + { + auto leaky_result = LeakyResult(1, "error"); + auto [r, s] = leaky_result.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } + { + auto leaky_result = LeakyResult(new int(1)); + auto [r, s] = leaky_result.leakyGet(); + ASSERT_NE(r, nullptr); + ASSERT_EQ(*(int*)(r), 1); + ASSERT_EQ(s.error_code, 0); + ASSERT_EQ(s.error_msg, nullptr); + delete (int*)(r); + } + { + LeakyResult leaky_result(1, "error"); + LeakyResult leaky_result_moved(std::move(leaky_result)); + auto [r, s] = leaky_result_moved.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } + { + LeakyResult leaky_result(1, "error"); + LeakyResult leaky_result_moved; + leaky_result_moved = std::move(leaky_result); + auto [r, s] = leaky_result_moved.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } +} + +TEST(Futures, Ready) { + Ready ready; + int a = 0; + ready.callOrRegisterCallback([&a]() { a++; }); + ASSERT_EQ(a, 0); + ASSERT_FALSE(ready.isReady()); + ready.setValue(1); + ASSERT_EQ(a, 1); + ASSERT_TRUE(ready.isReady()); + ready.callOrRegisterCallback([&a]() { a++; }); + ASSERT_EQ(a, 2); + + ASSERT_EQ(std::move(ready).getValue(), 1); +} + +TEST(Futures, Future) { + folly::CPUThreadPoolExecutor executor(2); + + // success path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_NE(r, nullptr); + ASSERT_EQ(*(int*)(r), 1); + ASSERT_EQ(s.error_code, 0); + ASSERT_EQ(s.error_msg, nullptr); + delete (int*)(r); + } + + // error path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw milvus::SegcoreError(milvus::NotImplemented, + "unimplemented"); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::NotImplemented); + ASSERT_STREQ(s.error_msg, "unimplemented"); + free((char*)(s.error_msg)); + } + + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw std::runtime_error("unimplemented"); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::UnexpectedError); + free((char*)(s.error_msg)); + } + + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw folly::FutureNotReady(); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::FollyOtherException); + free((char*)(s.error_msg)); + } + + // cancellation path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + for (int i = 0; i < 10; i++) { + token.throwIfCancelled(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + future->cancel(); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::FollyCancel); + free((char*)(s.error_msg)); + } +} \ No newline at end of file diff --git a/internal/core/unittest/test_group_by.cpp b/internal/core/unittest/test_group_by.cpp index 59e2ef0dda..1295230f25 100644 --- a/internal/core/unittest/test_group_by.cpp +++ b/internal/core/unittest/test_group_by.cpp @@ -609,10 +609,10 @@ TEST(GroupBY, Reduce) { CSearchResult c_search_res_1; CSearchResult c_search_res_2; auto status = - Search({}, c_segment_1, c_plan, c_ph_group, 1L << 63, &c_search_res_1); + CSearch(c_segment_1, c_plan, c_ph_group, 1L << 63, &c_search_res_1); ASSERT_EQ(status.error_code, Success); status = - Search({}, c_segment_2, c_plan, c_ph_group, 1L << 63, &c_search_res_2); + CSearch(c_segment_2, c_plan, c_ph_group, 1L << 63, &c_search_res_2); ASSERT_EQ(status.error_code, Success); std::vector results; results.push_back(c_search_res_1); diff --git a/internal/core/unittest/test_utils/c_api_test_utils.h b/internal/core/unittest/test_utils/c_api_test_utils.h index 225a00e6b3..cabf6ec432 100644 --- a/internal/core/unittest/test_utils/c_api_test_utils.h +++ b/internal/core/unittest/test_utils/c_api_test_utils.h @@ -28,6 +28,7 @@ #include "segcore/Reduce.h" #include "segcore/reduce_c.h" #include "segcore/segment_c.h" +#include "futures/Future.h" #include "DataGen.h" #include "PbHelper.h" #include "c_api_test_utils.h" @@ -147,8 +148,24 @@ CSearch(CSegmentInterface c_segment, CPlaceholderGroup c_placeholder_group, uint64_t timestamp, CSearchResult* result) { - return Search( - {}, c_segment, c_plan, c_placeholder_group, timestamp, result); + auto future = + AsyncSearch({}, c_segment, c_plan, c_placeholder_group, timestamp); + auto futurePtr = static_cast( + static_cast(static_cast(future))); + + std::mutex mu; + mu.lock(); + futurePtr->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + + auto [searchResult, status] = futurePtr->leakyGet(); + if (status.error_code != 0) { + return status; + } + *result = static_cast(searchResult); + return status; } } // namespace diff --git a/internal/querynodev2/segments/cgo_util.go b/internal/querynodev2/segments/cgo_util.go index 2e8c32e402..f82d25ae29 100644 --- a/internal/querynodev2/segments/cgo_util.go +++ b/internal/querynodev2/segments/cgo_util.go @@ -28,6 +28,7 @@ import "C" import ( "context" + "math" "unsafe" "github.com/golang/protobuf/proto" @@ -55,14 +56,9 @@ func HandleCStatus(ctx context.Context, status *C.CStatus, extraInfo string, fie return err } -// HandleCProto deal with the result proto returned from CGO -func HandleCProto(cRes *C.CProto, msg proto.Message) error { - // Standalone CProto is protobuf created by C side, - // Passed from c side - // memory is managed manually - lease, blob := cgoconverter.UnsafeGoBytes(&cRes.proto_blob, int(cRes.proto_size)) - defer cgoconverter.Release(lease) - +// UnmarshalCProto unmarshal the proto from C memory +func UnmarshalCProto(cRes *C.CProto, msg proto.Message) error { + blob := (*(*[math.MaxInt32]byte)(cRes.proto_blob))[:int(cRes.proto_size):int(cRes.proto_size)] return proto.Unmarshal(blob, msg) } diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index ea58f2802b..08b7aaa48a 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -163,6 +163,10 @@ func (suite *RetrieveSuite) TestRetrieveSealed() { suite.NoError(err) suite.Len(res[0].Result.Offset, 3) suite.manager.Segment.Unpin(segments) + + resultByOffsets, err := suite.sealed.RetrieveByOffsets(context.Background(), plan, []int64{0, 1}) + suite.NoError(err) + suite.Len(resultByOffsets.Offset, 0) } func (suite *RetrieveSuite) TestRetrieveGrowing() { @@ -182,6 +186,10 @@ func (suite *RetrieveSuite) TestRetrieveGrowing() { suite.NoError(err) suite.Len(res[0].Result.Offset, 3) suite.manager.Segment.Unpin(segments) + + resultByOffsets, err := suite.growing.RetrieveByOffsets(context.Background(), plan, []int64{0, 1}) + suite.NoError(err) + suite.Len(resultByOffsets.Offset, 0) } func (suite *RetrieveSuite) TestRetrieveStreamSealed() { diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 83404096cd..08e6707b1b 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -17,8 +17,9 @@ package segments /* -#cgo pkg-config: milvus_segcore +#cgo pkg-config: milvus_segcore milvus_futures +#include "futures/future_c.h" #include "segcore/collection_c.h" #include "segcore/plan_c.h" #include "segcore/reduce_c.h" @@ -53,6 +54,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments/state" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/cgo" typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -565,34 +567,39 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S defer s.ptrLock.RUnlock() traceCtx := ParseCTraceContext(ctx) + defer runtime.KeepAlive(traceCtx) + defer runtime.KeepAlive(searchReq) hasIndex := s.ExistIndex(searchReq.searchFieldID) log = log.With(zap.Bool("withIndex", hasIndex)) log.Debug("search segment...") - var searchResult SearchResult - var status C.CStatus - GetSQPool().Submit(func() (any, error) { - tr := timerecord.NewTimeRecorder("cgoSearch") - status = C.Search(traceCtx.ctx, - s.ptr, - searchReq.plan.cSearchPlan, - searchReq.cPlaceholderGroup, - C.uint64_t(searchReq.mvccTimestamp), - &searchResult.cSearchResult, - ) - runtime.KeepAlive(traceCtx) - metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - return nil, nil - }).Await() - if err := HandleCStatus(ctx, &status, "Search failed", - zap.Int64("collectionID", s.Collection()), - zap.Int64("segmentID", s.ID()), - zap.String("segmentType", s.segmentType.String())); err != nil { + tr := timerecord.NewTimeRecorder("cgoSearch") + + future := cgo.Async( + ctx, + func() cgo.CFuturePtr { + return (cgo.CFuturePtr)(C.AsyncSearch( + traceCtx.ctx, + s.ptr, + searchReq.plan.cSearchPlan, + searchReq.cPlaceholderGroup, + C.uint64_t(searchReq.mvccTimestamp), + )) + }, + cgo.WithName("search"), + ) + defer future.Release() + result, err := future.BlockAndLeakyGet() + if err != nil { + log.Warn("Search failed") return nil, err } + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("search segment done") - return &searchResult, nil + return &SearchResult{ + cSearchResult: (C.CSearchResult)(result), + }, nil } func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) { @@ -612,69 +619,65 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco log.Debug("begin to retrieve") traceCtx := ParseCTraceContext(ctx) + defer runtime.KeepAlive(traceCtx) + defer runtime.KeepAlive(plan) maxLimitSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64() - var retrieveResult RetrieveResult - var status C.CStatus - GetSQPool().Submit(func() (any, error) { - ts := C.uint64_t(plan.Timestamp) - tr := timerecord.NewTimeRecorder("cgoRetrieve") - status = C.Retrieve(traceCtx.ctx, - s.ptr, - plan.cRetrievePlan, - ts, - &retrieveResult.cRetrieveResult, - C.int64_t(maxLimitSize), - C.bool(plan.ignoreNonPk)) - runtime.KeepAlive(traceCtx) + tr := timerecord.NewTimeRecorder("cgoRetrieve") - metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), - metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - log.Debug("cgo retrieve done", zap.Duration("timeTaken", tr.ElapseSpan())) - return nil, nil - }).Await() - - if err := HandleCStatus(ctx, &status, "Retrieve failed", - zap.Int64("collectionID", s.Collection()), - zap.Int64("partitionID", s.Partition()), - zap.Int64("segmentID", s.ID()), - zap.Int64("msgID", plan.msgID), - zap.String("segmentType", s.segmentType.String())); err != nil { + future := cgo.Async( + ctx, + func() cgo.CFuturePtr { + return (cgo.CFuturePtr)(C.AsyncRetrieve( + traceCtx.ctx, + s.ptr, + plan.cRetrievePlan, + C.uint64_t(plan.Timestamp), + C.int64_t(maxLimitSize), + C.bool(plan.ignoreNonPk), + )) + }, + cgo.WithName("retrieve"), + ) + defer future.Release() + result, err := future.BlockAndLeakyGet() + if err != nil { + log.Warn("Retrieve failed") return nil, err } + defer C.DeleteRetrieveResult((*C.CRetrieveResult)(result)) + + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) _, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "partial-segcore-results-deserialization") defer span.End() - result := new(segcorepb.RetrieveResults) - if err := HandleCProto(&retrieveResult.cRetrieveResult, result); err != nil { + retrieveResult := new(segcorepb.RetrieveResults) + if err := UnmarshalCProto((*C.CRetrieveResult)(result), retrieveResult); err != nil { + log.Warn("unmarshal retrieve result failed", zap.Error(err)) return nil, err } log.Debug("retrieve segment done", - zap.Int("resultNum", len(result.Offset)), + zap.Int("resultNum", len(retrieveResult.Offset)), ) - // Sort was done by the segcore. // sort.Sort(&byPK{result}) - return result, nil + return retrieveResult, nil } func (s *LocalSegment) RetrieveByOffsets(ctx context.Context, plan *RetrievePlan, offsets []int64) (*segcorepb.RetrieveResults, error) { + if len(offsets) == 0 { + return nil, merr.WrapErrParameterInvalid("segment offsets", "empty offsets") + } + if !s.ptrLock.RLockIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } defer s.ptrLock.RUnlock() - if s.ptr == nil { - return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") - } - - if len(offsets) == 0 { - return nil, merr.WrapErrParameterInvalid("segment offsets", "empty offsets") - } - fields := []zap.Field{ zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), @@ -686,40 +689,49 @@ func (s *LocalSegment) RetrieveByOffsets(ctx context.Context, plan *RetrievePlan log := log.Ctx(ctx).With(fields...) log.Debug("begin to retrieve by offsets") - - traceCtx := ParseCTraceContext(ctx) - - var retrieveResult RetrieveResult - var status C.CStatus - tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets") - status = C.RetrieveByOffsets(traceCtx.ctx, - s.ptr, - plan.cRetrievePlan, - &retrieveResult.cRetrieveResult, - (*C.int64_t)(unsafe.Pointer(&offsets[0])), - C.int64_t(len(offsets))) - runtime.KeepAlive(traceCtx) + traceCtx := ParseCTraceContext(ctx) + defer runtime.KeepAlive(traceCtx) + defer runtime.KeepAlive(plan) + defer runtime.KeepAlive(offsets) + + future := cgo.Async( + ctx, + func() cgo.CFuturePtr { + return (cgo.CFuturePtr)(C.AsyncRetrieveByOffsets( + traceCtx.ctx, + s.ptr, + plan.cRetrievePlan, + (*C.int64_t)(unsafe.Pointer(&offsets[0])), + C.int64_t(len(offsets)), + )) + }, + cgo.WithName("retrieve-by-offsets"), + ) + defer future.Release() + result, err := future.BlockAndLeakyGet() + if err != nil { + log.Warn("RetrieveByOffsets failed") + return nil, err + } + defer C.DeleteRetrieveResult((*C.CRetrieveResult)(result)) metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - log.Debug("cgo retrieve by offsets done", zap.Duration("timeTaken", tr.ElapseSpan())) - - if err := HandleCStatus(ctx, &status, "RetrieveByOffsets failed", fields...); err != nil { - return nil, err - } _, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "reduced-segcore-results-deserialization") defer span.End() - result := new(segcorepb.RetrieveResults) - if err := HandleCProto(&retrieveResult.cRetrieveResult, result); err != nil { + retrieveResult := new(segcorepb.RetrieveResults) + if err := UnmarshalCProto((*C.CRetrieveResult)(result), retrieveResult); err != nil { + log.Warn("unmarshal retrieve by offsets result failed", zap.Error(err)) return nil, err } - log.Debug("retrieve by segment offsets done") - - return result, nil + log.Debug("retrieve by segment offsets done", + zap.Int("resultNum", len(retrieveResult.Offset)), + ) + return retrieveResult, nil } func (s *LocalSegment) GetFieldDataPath(index *IndexedFieldInfo, offset int64) (dataPath string, offsetInBinlog int64) { diff --git a/internal/util/cgo/errors.go b/internal/util/cgo/errors.go new file mode 100644 index 0000000000..c0bb6e482f --- /dev/null +++ b/internal/util/cgo/errors.go @@ -0,0 +1,27 @@ +package cgo + +/* +#cgo pkg-config: milvus_common + +#include "common/type_c.h" +#include +*/ +import "C" + +import ( + "unsafe" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func ConsumeCStatusIntoError(status *C.CStatus) error { + if status.error_code == 0 { + return nil + } + errorCode := status.error_code + errorMsg := C.GoString(status.error_msg) + getCGOCaller().call("free", func() { + C.free(unsafe.Pointer(status.error_msg)) + }) + return merr.SegcoreError(int32(errorCode), errorMsg) +} diff --git a/internal/util/cgo/executor.go b/internal/util/cgo/executor.go new file mode 100644 index 0000000000..a589513469 --- /dev/null +++ b/internal/util/cgo/executor.go @@ -0,0 +1,36 @@ +package cgo + +/* +#cgo pkg-config: milvus_futures + +#include "futures/future_c.h" +*/ +import "C" + +import ( + "math" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +// initExecutor initialize underlying cgo thread pool. +func initExecutor() { + pt := paramtable.Get() + initPoolSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())) + C.executor_set_thread_num(C.int(initPoolSize)) + + resetThreadNum := func(evt *config.Event) { + if evt.HasUpdated { + pt := paramtable.Get() + newSize := int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat() * pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())) + log.Info("reset cgo thread num", zap.Int("thread_num", newSize)) + C.executor_set_thread_num(C.int(newSize)) + } + } + pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("cgo."+pt.QueryNodeCfg.MaxReadConcurrency.Key, resetThreadNum)) + pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("cgo."+pt.QueryNodeCfg.CGOPoolSizeRatio.Key, resetThreadNum)) +} diff --git a/internal/util/cgo/futures.go b/internal/util/cgo/futures.go new file mode 100644 index 0000000000..3b6aadf454 --- /dev/null +++ b/internal/util/cgo/futures.go @@ -0,0 +1,192 @@ +package cgo + +/* +#cgo pkg-config: milvus_futures + +#include "futures/future_c.h" +#include + +extern void unlockMutex(void*); + +static inline void unlockMutexOnC(CLockedGoMutex* m) { + unlockMutex((void*)(m)); +} + +static inline void future_go_register_ready_callback(CFuture* f, CLockedGoMutex* m) { + future_register_ready_callback(f, unlockMutexOnC, m); +} +*/ +import "C" + +import ( + "context" + "sync" + "unsafe" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +var ErrConsumed = errors.New("future is already consumed") + +// Would put this in futures.go but for the documented issue with +// exports and functions in preamble +// (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions) +// +//export unlockMutex +func unlockMutex(p unsafe.Pointer) { + m := (*sync.Mutex)(p) + m.Unlock() +} + +type basicFuture interface { + // Context return the context of the future. + Context() context.Context + + // BlockUntilReady block until the future is ready or canceled. + // caller can call this method multiple times in different concurrent unit. + BlockUntilReady() + + // cancel the future with error. + cancel(error) +} + +type Future interface { + basicFuture + + // BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result. + // Caller should only call once for BlockAndLeakyGet, otherwise the ErrConsumed will returned. + // Caller will get the merr.ErrSegcoreCancel or merr.ErrSegcoreTimeout respectively if the future is canceled or timeout. + // Caller will get other error if the underlying cgo function throws, otherwise caller will get result. + // Caller should free the result after used (defined by caller), otherwise the memory of result is leaked. + BlockAndLeakyGet() (unsafe.Pointer, error) + + // Release the resource of the future. + // !!! Release is not concurrent safe with other methods. + // It should be called only once after all method of future is returned. + Release() +} + +type ( + CFuturePtr unsafe.Pointer + CGOAsyncFunction = func() CFuturePtr +) + +// Async is a helper function to call a C async function that returns a future. +func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future { + initCGO() + + options := getDefaultOpt() + // apply options. + for _, opt := range opts { + opt(options) + } + + // create a future for caller to use. + var cFuturePtr *C.CFuture + getCGOCaller().call(options.name, func() { + cFuturePtr = (*C.CFuture)(f()) + }) + + ctx, cancel := context.WithCancel(ctx) + future := &futureImpl{ + closure: f, + ctx: ctx, + ctxCancel: cancel, + releaserOnce: sync.Once{}, + future: cFuturePtr, + opts: options, + state: newFutureState(), + } + + // register the future to do timeout notification. + futureManager.Register(future) + return future +} + +type futureImpl struct { + ctx context.Context + ctxCancel context.CancelFunc + future *C.CFuture + closure CGOAsyncFunction + opts *options + state futureState + releaserOnce sync.Once +} + +func (f *futureImpl) Context() context.Context { + return f.ctx +} + +func (f *futureImpl) BlockUntilReady() { + f.blockUntilReady() +} + +func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) { + f.blockUntilReady() + + if !f.state.intoConsumed() { + return nil, ErrConsumed + } + + var ptr unsafe.Pointer + var status C.CStatus + getCGOCaller().call("future_leak_and_get", func() { + status = C.future_leak_and_get(f.future, &ptr) + }) + err := ConsumeCStatusIntoError(&status) + + if errors.Is(err, merr.ErrSegcoreFollyCancel) { + // mark the error with context error. + return nil, errors.Mark(err, f.ctx.Err()) + } + return ptr, err +} + +func (f *futureImpl) Release() { + // block until ready to release the future. + f.blockUntilReady() + // release the future. + getCGOCaller().call("future_destroy", func() { + C.future_destroy(f.future) + }) +} + +func (f *futureImpl) cancel(err error) { + if !f.state.checkUnready() { + // only unready future can be canceled. + // a ready future' cancel make no sense. + return + } + + if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) { + getCGOCaller().call("future_cancel", func() { + C.future_cancel(f.future) + }) + return + } + panic("unreachable: invalid cancel error type") +} + +func (f *futureImpl) blockUntilReady() { + if !f.state.checkUnready() { + // only unready future should be block until ready. + return + } + + mu := &sync.Mutex{} + mu.Lock() + getCGOCaller().call("future_go_register_ready_callback", func() { + C.future_go_register_ready_callback(f.future, (*C.CLockedGoMutex)(unsafe.Pointer(mu))) + }) + mu.Lock() + + // mark the future as ready at go side to avoid more cgo calls. + f.state.intoReady() + // notify the future manager that the future is ready. + f.ctxCancel() + if f.opts.releaser != nil { + f.releaserOnce.Do(f.opts.releaser) + } +} diff --git a/internal/util/cgo/futures_test.go b/internal/util/cgo/futures_test.go new file mode 100644 index 0000000000..39a58f4ab1 --- /dev/null +++ b/internal/util/cgo/futures_test.go @@ -0,0 +1,272 @@ +package cgo + +import ( + "context" + "fmt" + "os" + "runtime" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + initCGO() + exitCode := m.Run() + if exitCode > 0 { + os.Exit(exitCode) + } +} + +func TestFutureWithSuccessCase(t *testing.T) { + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: 100, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 100, getCInt(result)) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + freeCInt(result) + runtime.GC() + + _, err = future.BlockAndLeakyGet() + assert.ErrorIs(t, err, ErrConsumed) +} + +func TestFutureWithCaseNoInterrupt(t *testing.T) { + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 0, getCInt(result)) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + freeCInt(result) + + // Test cancellation on no interrupt handling case. + start = time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + + result, err = future.BlockAndLeakyGet() + // the future is timeout by the context after 200ms, but the underlying task doesn't handle the cancel, the future will return after 2s. + assert.Greater(t, time.Since(start).Seconds(), 2.0) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, 0, getCInt(result)) + freeCInt(result) +} + +// TestFutures test the future implementation. +func TestFutures(t *testing.T) { + // Test failed case, throw folly exception. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowStdException, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + + // Test failed case, throw std exception. + future = createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowFollyException, + }) + defer future.Release() + start = time.Now() + future.BlockUntilReady() // test block until ready too. + result, err = future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyOtherException) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + + // Test failed case, throw std exception. + future = createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowSegcoreException, + }) + defer future.Release() + start = time.Now() + future.BlockUntilReady() // test block until ready too. + result, err = future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + + // Test cancellation. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + // canceled before the future(2s) is ready. + go func() { + time.Sleep(200 * time.Millisecond) + cancel() + }() + start = time.Now() + result, err = future.BlockAndLeakyGet() + // the future is canceled by the context after 200ms, so the future should be done in 1s but not 2s. + assert.Less(t, time.Since(start).Seconds(), 1.0) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.Canceled)) + assert.Nil(t, result) + + // Test cancellation. + ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + start = time.Now() + result, err = future.BlockAndLeakyGet() + // the future is timeout by the context after 200ms, so the future should be done in 1s but not 2s. + assert.Less(t, time.Since(start).Seconds(), 1.0) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + assert.Nil(t, result) + runtime.GC() +} + +func TestConcurrent(t *testing.T) { + // Test is compatible with old implementation of fast fail future. + // So it's complicated and not easy to understand. + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(4) + // success case + go func() { + defer wg.Done() + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: 100, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 100, getCInt(result)) + freeCInt(result) + }() + + // fail case + go func() { + defer wg.Done() + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowStdException, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.Nil(t, result) + }() + + // timeout case + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future := createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + assert.Nil(t, result) + }() + + // no interrupt with timeout case + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future := createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + if err == nil { + assert.Equal(t, 0, getCInt(result)) + } else { + // the future may be queued and not started, + // so the underlying task may be throw a cancel exception if it's not started. + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + } + freeCInt(result) + }() + } + wg.Wait() + assert.Eventually(t, func() bool { + stat := futureManager.Stat() + fmt.Printf("active count: %d\n", stat.ActiveCount) + return stat.ActiveCount == 0 + }, 5*time.Second, 100*time.Millisecond) + runtime.GC() +} diff --git a/internal/util/cgo/futures_test_case.go b/internal/util/cgo/futures_test_case.go new file mode 100644 index 0000000000..3cc933c095 --- /dev/null +++ b/internal/util/cgo/futures_test_case.go @@ -0,0 +1,48 @@ +//go:build test +// +build test + +package cgo + +/* +#cgo pkg-config: milvus_futures + +#include "futures/future_c.h" +#include + +*/ +import "C" + +import ( + "context" + "time" + "unsafe" +) + +const ( + caseNoNoInterrupt int = 0 + caseNoThrowStdException int = 1 + caseNoThrowFollyException int = 2 + caseNoThrowSegcoreException int = 3 +) + +type testCase struct { + interval time.Duration + loopCnt int + caseNo int +} + +func createFutureWithTestCase(ctx context.Context, testCase testCase) Future { + f := func() CFuturePtr { + return (CFuturePtr)(C.future_create_test_case(C.int(testCase.interval.Milliseconds()), C.int(testCase.loopCnt), C.int(testCase.caseNo))) + } + future := Async(ctx, f, WithName("createFutureWithTestCase")) + return future +} + +func getCInt(p unsafe.Pointer) int { + return int(*(*C.int)(p)) +} + +func freeCInt(p unsafe.Pointer) { + C.free(p) +} diff --git a/internal/util/cgo/manager_active.go b/internal/util/cgo/manager_active.go new file mode 100644 index 0000000000..37c6011f18 --- /dev/null +++ b/internal/util/cgo/manager_active.go @@ -0,0 +1,114 @@ +package cgo + +import ( + "reflect" + "sync" + + "go.uber.org/atomic" + + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +const ( + registerIndex = 0 + maxSelectCase = 65535 + defaultRegisterBuf = 1 +) + +var ( + futureManager *activeFutureManager + initOnce sync.Once +) + +// initCGO initializes the cgo caller and future manager. +func initCGO() { + initOnce.Do(func() { + nodeID := paramtable.GetStringNodeID() + initCaller(nodeID) + initExecutor() + futureManager = newActiveFutureManager(nodeID) + futureManager.Run() + }) +} + +type futureManagerStat struct { + ActiveCount int64 +} + +func newActiveFutureManager(nodeID string) *activeFutureManager { + manager := &activeFutureManager{ + activeCount: atomic.NewInt64(0), + activeFutures: make([]basicFuture, 0), + cases: make([]reflect.SelectCase, 1), + register: make(chan basicFuture, defaultRegisterBuf), + nodeID: nodeID, + } + manager.cases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(manager.register), + } + return manager +} + +// activeFutureManager manages the active futures. +// it will transfer the cancel signal into cgo. +type activeFutureManager struct { + activeCount *atomic.Int64 + activeFutures []basicFuture + cases []reflect.SelectCase + register chan basicFuture + nodeID string +} + +// Run starts the active future manager. +func (m *activeFutureManager) Run() { + go func() { + for { + m.doSelect() + } + }() +} + +// Register registers a future when it's created into the manager. +func (m *activeFutureManager) Register(c basicFuture) { + m.register <- c +} + +// Stat returns the stat of the manager, only for testing now. +func (m *activeFutureManager) Stat() futureManagerStat { + return futureManagerStat{ + ActiveCount: m.activeCount.Load(), + } +} + +// doSelect selects the active futures and cancel the finished ones. +func (m *activeFutureManager) doSelect() { + index, newCancelableObject, _ := reflect.Select(m.getSelectableCases()) + if index == registerIndex { + newCancelable := newCancelableObject.Interface().(basicFuture) + m.cases = append(m.cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(newCancelable.Context().Done()), + }) + m.activeFutures = append(m.activeFutures, newCancelable) + } else { + m.cases = append(m.cases[:index], m.cases[index+1:]...) + offset := index - 1 + // cancel the future and move it into gc manager. + m.activeFutures[offset].cancel(m.activeFutures[offset].Context().Err()) + m.activeFutures = append(m.activeFutures[:offset], m.activeFutures[offset+1:]...) + } + activeTotal := len(m.activeFutures) + m.activeCount.Store(int64(activeTotal)) + metrics.ActiveFutureTotal.WithLabelValues( + m.nodeID, + ).Set(float64(activeTotal)) +} + +func (m *activeFutureManager) getSelectableCases() []reflect.SelectCase { + if len(m.cases) <= maxSelectCase { + return m.cases + } + return m.cases[0:maxSelectCase] +} diff --git a/internal/util/cgo/options.go b/internal/util/cgo/options.go new file mode 100644 index 0000000000..96c25c4357 --- /dev/null +++ b/internal/util/cgo/options.go @@ -0,0 +1,32 @@ +package cgo + +func getDefaultOpt() *options { + return &options{ + name: "unknown", + releaser: nil, + } +} + +type options struct { + name string + releaser func() +} + +// Opt is the option type for future. +type Opt func(*options) + +// WithReleaser sets the releaser function. +// When a future is ready, the releaser function will be called once. +func WithReleaser(releaser func()) Opt { + return func(o *options) { + o.releaser = releaser + } +} + +// WithName sets the name of the future. +// Only used for metrics. +func WithName(name string) Opt { + return func(o *options) { + o.name = name + } +} diff --git a/internal/util/cgo/pool.go b/internal/util/cgo/pool.go new file mode 100644 index 0000000000..789db284e9 --- /dev/null +++ b/internal/util/cgo/pool.go @@ -0,0 +1,56 @@ +package cgo + +import ( + "math" + "runtime" + "time" + + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +var caller *cgoCaller + +func initCaller(nodeID string) { + chSize := int64(math.Ceil(float64(hardware.GetCPUNum()) * paramtable.Get().QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())) + if chSize <= 0 { + chSize = 1 + } + caller = &cgoCaller{ + ch: make(chan struct{}, chSize), + nodeID: nodeID, + } +} + +// getCGOCaller returns the cgoCaller instance. +func getCGOCaller() *cgoCaller { + return caller +} + +// cgoCaller is a limiter to restrict the number of concurrent cgo calls. +type cgoCaller struct { + ch chan struct{} + nodeID string +} + +// call calls the work function with a lock to restrict the number of concurrent cgo calls. +// it collect some metrics too. +func (c *cgoCaller) call(name string, work func()) { + start := time.Now() + c.ch <- struct{}{} + queueTime := time.Since(start) + metrics.CGOQueueDuration.WithLabelValues(c.nodeID).Observe(queueTime.Seconds()) + + runtime.LockOSThread() + defer func() { + runtime.UnlockOSThread() + <-c.ch + + metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Dec() + total := time.Since(start) - queueTime + metrics.CGODuration.WithLabelValues(c.nodeID, name).Observe(total.Seconds()) + }() + metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Inc() + work() +} diff --git a/internal/util/cgo/state.go b/internal/util/cgo/state.go new file mode 100644 index 0000000000..db262c4b60 --- /dev/null +++ b/internal/util/cgo/state.go @@ -0,0 +1,38 @@ +package cgo + +import "go.uber.org/atomic" + +const ( + stateUnready int32 = iota + stateReady + stateConsumed +) + +// newFutureState creates a new futureState. +func newFutureState() futureState { + return futureState{ + inner: atomic.NewInt32(stateUnready), + } +} + +// futureState is a state machine for future. +// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed +type futureState struct { + inner *atomic.Int32 +} + +// intoReady sets the state to ready. +func (s *futureState) intoReady() { + s.inner.CompareAndSwap(stateUnready, stateReady) +} + +// intoConsumed sets the state to consumed. +// if the state is not ready, it does nothing and returns false. +func (s *futureState) intoConsumed() bool { + return s.inner.CompareAndSwap(stateReady, stateConsumed) +} + +// checkUnready checks if the state is unready. +func (s *futureState) checkUnready() bool { + return s.inner.Load() == stateUnready +} diff --git a/pkg/metrics/cgo_metrics.go b/pkg/metrics/cgo_metrics.go new file mode 100644 index 0000000000..d237493a40 --- /dev/null +++ b/pkg/metrics/cgo_metrics.go @@ -0,0 +1,86 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + subsystemCGO = "cgo" + cgoLabelName = "name" + once sync.Once + bucketsForCGOCall = []float64{ + 10 * time.Nanosecond.Seconds(), + 100 * time.Nanosecond.Seconds(), + 250 * time.Nanosecond.Seconds(), + 500 * time.Nanosecond.Seconds(), + time.Microsecond.Seconds(), + 10 * time.Microsecond.Seconds(), + 20 * time.Microsecond.Seconds(), + 50 * time.Microsecond.Seconds(), + 100 * time.Microsecond.Seconds(), + 250 * time.Microsecond.Seconds(), + 500 * time.Microsecond.Seconds(), + time.Millisecond.Seconds(), + 2 * time.Millisecond.Seconds(), + 10 * time.Millisecond.Seconds(), + } + + ActiveFutureTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "active_future_total", + Help: "Total number of active futures.", + }, []string{ + nodeIDLabelName, + }, + ) + + RunningCgoCallTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "running_cgo_call_total", + Help: "Total number of running cgo calls.", + }, []string{ + nodeIDLabelName, + }) + + CGODuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "cgo_duration_seconds", + Help: "Histogram of cgo call duration in seconds.", + Buckets: bucketsForCGOCall, + }, []string{ + nodeIDLabelName, + cgoLabelName, + }, + ) + + CGOQueueDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "cgo_queue_duration_seconds", + Help: "Duration of cgo call in queue.", + Buckets: bucketsForCGOCall, + }, []string{ + nodeIDLabelName, + }, + ) +) + +// RegisterCGOMetrics registers the cgo metrics. +func RegisterCGOMetrics(registry *prometheus.Registry) { + once.Do(func() { + registry.MustRegister(ActiveFutureTotal) + registry.MustRegister(RunningCgoCallTotal) + registry.MustRegister(CGODuration) + registry.MustRegister(CGOQueueDuration) + }) +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 99c2d8d414..03bb0d879e 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -38,6 +38,7 @@ func TestRegisterMetrics(t *testing.T) { RegisterMetaMetrics(r) RegisterStorageMetrics(r) RegisterMsgStreamMetrics(r) + RegisterCGOMetrics(r) }) } diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index c506df64df..c06bb4fb85 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -805,6 +805,8 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeApplyBFCost) registry.MustRegister(QueryNodeForwardDeleteCost) registry.MustRegister(QueryNodeSegmentPruneRatio) + // Add cgo metrics + RegisterCGOMetrics(registry) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 3cf4a5378d..7e7c602ff3 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -175,9 +175,11 @@ var ( ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false) // Segcore related - ErrSegcore = newMilvusError("segcore error", 2000, false) - ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false) - ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false) + ErrSegcore = newMilvusError("segcore error", 2000, false) + ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false) + ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false) + ErrSegcoreFollyOtherException = newMilvusError("segcore folly other exception", 2037, false) // throw from segcore. + ErrSegcoreFollyCancel = newMilvusError("segcore Future was canceled", 2038, false) // throw from segcore. // Do NOT export this, // never allow programmer using this, keep only for converting unknown error to milvusError diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8af5fb9357..e7ffd8a600 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3442,7 +3442,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.gc.interval", Version: "2.0.0", DefaultValue: "3600", - Doc: "gc interval in seconds", + Doc: "meta-based gc scanning interval in seconds", Export: true, } p.GCInterval.Init(base.mgr) @@ -3451,7 +3451,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.gc.scanInterval", Version: "2.4.0", DefaultValue: "168", // hours, default 7 * 24 hours - Doc: "garbage collection scan residue interval in hours", + Doc: "orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours", Export: true, } p.GCScanIntervalInHour.Init(base.mgr) @@ -3461,7 +3461,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.gc.missingTolerance", Version: "2.0.0", DefaultValue: "86400", - Doc: "file meta missing tolerance duration in seconds, default to 24hr(1d)", + Doc: "orphan file gc tolerance duration in seconds (orphan file which last modified time before the tolerance interval ago will be deleted)", Export: true, } p.GCMissingTolerance.Init(base.mgr) @@ -3470,7 +3470,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.gc.dropTolerance", Version: "2.0.0", DefaultValue: "10800", - Doc: "file belongs to dropped entity tolerance duration in seconds. 3600", + Doc: "meta-based gc tolerace duration in seconds (file which meta is marked as dropped before the tolerace interval ago will be deleted)", Export: true, } p.GCDropTolerance.Init(base.mgr) diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 0f542d3c91..3d68bcb014 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -107,6 +107,7 @@ go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failf go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/cgo/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_pkg()