mirror of https://github.com/milvus-io/milvus.git
fix: Implement singleflight for segcore ChunkCache (#34250)
See also #34249 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/34229/head^2
parent
f7ecafe77d
commit
14e827dc6c
|
@ -15,32 +15,56 @@
|
|||
// limitations under the License.
|
||||
|
||||
#include "ChunkCache.h"
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include "common/Types.h"
|
||||
#include "mmap/Utils.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
std::shared_ptr<ColumnBase>
|
||||
ChunkCache::Read(const std::string& filepath,
|
||||
const MmapChunkDescriptorPtr& descriptor) {
|
||||
// use rlock to get future
|
||||
{
|
||||
std::shared_lock lck(mutex_);
|
||||
auto it = columns_.find(filepath);
|
||||
if (it != columns_.end()) {
|
||||
AssertInfo(it->second, "unexpected null column, file={}", filepath);
|
||||
return it->second;
|
||||
lck.unlock();
|
||||
auto result = it->second.second.get();
|
||||
AssertInfo(result, "unexpected null column, file={}", filepath);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath);
|
||||
|
||||
// lock for mutation
|
||||
std::unique_lock lck(mutex_);
|
||||
// double check no-futurn
|
||||
auto it = columns_.find(filepath);
|
||||
if (it != columns_.end()) {
|
||||
return it->second;
|
||||
lck.unlock();
|
||||
auto result = it->second.second.get();
|
||||
AssertInfo(result, "unexpected null column, file={}", filepath);
|
||||
return result;
|
||||
}
|
||||
|
||||
std::promise<std::shared_ptr<ColumnBase>> p;
|
||||
std::shared_future<std::shared_ptr<ColumnBase>> f = p.get_future();
|
||||
columns_.emplace(filepath, std::make_pair(std::move(p), f));
|
||||
lck.unlock();
|
||||
|
||||
// release lock and perform download and decode
|
||||
// other thread request same path shall get the future.
|
||||
auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath);
|
||||
auto column = Mmap(field_data->GetFieldData(), descriptor);
|
||||
|
||||
// set promise value to notify the future
|
||||
lck.lock();
|
||||
it = columns_.find(filepath);
|
||||
if (it != columns_.end()) {
|
||||
// check pair exists then set value
|
||||
it->second.first.set_value(column);
|
||||
}
|
||||
lck.unlock();
|
||||
AssertInfo(column, "unexpected null column, file={}", filepath);
|
||||
columns_.emplace(filepath, column);
|
||||
return column;
|
||||
}
|
||||
|
||||
|
@ -58,7 +82,7 @@ ChunkCache::Prefetch(const std::string& filepath) {
|
|||
return;
|
||||
}
|
||||
|
||||
auto column = it->second;
|
||||
auto column = it->second.second.get();
|
||||
auto ok = madvise(
|
||||
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
|
||||
column->ByteSize(),
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
#include <future>
|
||||
#include <unordered_map>
|
||||
#include "storage/MmapChunkManager.h"
|
||||
#include "mmap/Column.h"
|
||||
|
||||
|
@ -60,8 +62,10 @@ class ChunkCache {
|
|||
CachePath(const std::string& filepath);
|
||||
|
||||
private:
|
||||
using ColumnTable =
|
||||
std::unordered_map<std::string, std::shared_ptr<ColumnBase>>;
|
||||
using ColumnTable = std::unordered_map<
|
||||
std::string,
|
||||
std::pair<std::promise<std::shared_ptr<ColumnBase>>,
|
||||
std::shared_future<std::shared_ptr<ColumnBase>>>>;
|
||||
|
||||
private:
|
||||
mutable std::shared_mutex mutex_;
|
||||
|
|
Loading…
Reference in New Issue