mirror of https://github.com/milvus-io/milvus.git
Add ut for growing segment load binlog (#26268)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/26319/head
parent
a3c176045d
commit
5b8d716cbc
|
@ -10,15 +10,19 @@
|
|||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "common/SystemProperty.h"
|
||||
|
||||
#include "test_utils/Constants.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
|
||||
int
|
||||
main(int argc, char** argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
|
||||
TestLocalPath);
|
||||
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
|
||||
get_default_local_storage_config());
|
||||
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "test_utils/DataGen.h"
|
||||
#include "test_utils/PbHelper.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
#include "query/generated/ExecExprVisitor.h"
|
||||
|
||||
namespace chrono = std::chrono;
|
||||
|
@ -3735,7 +3736,8 @@ TEST(CApiTest, SealedSegment_Update_Field_Size) {
|
|||
int row_size = 10;
|
||||
|
||||
// update row_size =10 with n rows
|
||||
auto status = UpdateFieldRawDataSize(segment, str_fid.get(), N, N * row_size);
|
||||
auto status =
|
||||
UpdateFieldRawDataSize(segment, str_fid.get(), N, N * row_size);
|
||||
ASSERT_EQ(status.error_code, Success);
|
||||
ASSERT_EQ(segment->get_field_avg_size(str_fid), row_size);
|
||||
|
||||
|
@ -3755,6 +3757,36 @@ TEST(CApiTest, SealedSegment_Update_Field_Size) {
|
|||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, GrowingSegment_Load_Field_Data) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto str_fid = schema->AddDebugField("string", DataType::VARCHAR);
|
||||
auto vec_fid = schema->AddDebugField(
|
||||
"vector_float", DataType::VECTOR_FLOAT, DIM, "L2");
|
||||
schema->set_primary_field_id(str_fid);
|
||||
|
||||
auto segment = CreateGrowingSegment(schema, empty_index_meta).release();
|
||||
|
||||
int N = ROW_COUNT;
|
||||
auto raw_data = DataGen(schema, N);
|
||||
|
||||
auto storage_config = get_default_local_storage_config();
|
||||
auto cm = storage::CreateChunkManager(storage_config);
|
||||
auto load_info =
|
||||
PrepareInsertBinlog(1,
|
||||
2,
|
||||
3,
|
||||
storage_config.root_path + "/" + "test_load_sealed",
|
||||
raw_data,
|
||||
cm);
|
||||
|
||||
auto status = LoadFieldData(segment, &load_info);
|
||||
ASSERT_EQ(status.error_code, Success);
|
||||
ASSERT_EQ(segment->get_real_count(), ROW_COUNT);
|
||||
ASSERT_NE(segment->get_field_avg_size(str_fid), 0);
|
||||
|
||||
DeleteSegment(segment);
|
||||
}
|
||||
|
||||
TEST(CApiTest, RetriveScalarFieldFromSealedSegmentWithIndex) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto i8_fid = schema->AddDebugField("age8", DataType::INT8);
|
||||
|
|
|
@ -17,18 +17,16 @@
|
|||
|
||||
#include "common/Slice.h"
|
||||
#include "common/Common.h"
|
||||
#include "storage/Event.h"
|
||||
#include "storage/ThreadPool.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace milvus;
|
||||
using namespace milvus::storage;
|
||||
using namespace boost::filesystem;
|
||||
using namespace knowhere;
|
||||
|
||||
class DiskAnnFileManagerTest : public testing::Test {
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
#include "common/type_c.h"
|
||||
|
||||
constexpr int NB = 10;
|
||||
const CStorageConfig c_storage_config = get_default_cstorage_config();
|
||||
|
||||
TEST(FloatVecIndex, All) {
|
||||
auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
|
||||
|
@ -200,7 +199,7 @@ TEST(CBoolIndexTest, All) {
|
|||
{ DeleteBinarySet(binary_set); }
|
||||
}
|
||||
|
||||
delete[] (char*)(half_ds->GetTensor());
|
||||
delete[](char*)(half_ds->GetTensor());
|
||||
}
|
||||
|
||||
// TODO: more scalar type.
|
||||
|
@ -317,6 +316,6 @@ TEST(CStringIndexTest, All) {
|
|||
{ DeleteBinarySet(binary_set); }
|
||||
}
|
||||
|
||||
delete[] (char*)(str_ds->GetTensor());
|
||||
delete[](char*)(str_ds->GetTensor());
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "indexbuilder/VecIndexCreator.h"
|
||||
#include "common/QueryResult.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
|
||||
using namespace milvus;
|
||||
using namespace milvus::segcore;
|
||||
|
|
|
@ -23,11 +23,11 @@
|
|||
#include "common/QueryResult.h"
|
||||
#include "segcore/Types.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "test_utils/Timer.h"
|
||||
#include "storage/Util.h"
|
||||
|
||||
using namespace boost::filesystem;
|
||||
using namespace milvus;
|
||||
using namespace milvus::segcore;
|
||||
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
#include <limits>
|
||||
#include <cmath>
|
||||
#include <google/protobuf/text_format.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "DataGen.h"
|
||||
#include "index/ScalarIndex.h"
|
||||
|
@ -40,176 +38,9 @@ using milvus::indexbuilder::ScalarIndexCreator;
|
|||
using ScalarTestParams = std::pair<MapParams, MapParams>;
|
||||
using milvus::index::ScalarIndexPtr;
|
||||
using milvus::index::StringIndexPtr;
|
||||
using milvus::storage::StorageConfig;
|
||||
using namespace boost::filesystem;
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
find_file(const path& dir, const std::string& file_name, path& path_found) {
|
||||
const recursive_directory_iterator end;
|
||||
boost::system::error_code err;
|
||||
auto iter = recursive_directory_iterator(dir, err);
|
||||
while (iter != end) {
|
||||
try {
|
||||
if ((*iter).path().filename() == file_name) {
|
||||
path_found = (*iter).path();
|
||||
return true;
|
||||
}
|
||||
iter++;
|
||||
} catch (std::exception& e) {
|
||||
LOG_SEGCORE_ERROR_ << e.what();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
StorageConfig
|
||||
get_default_local_storage_config() {
|
||||
StorageConfig storage_config;
|
||||
storage_config.storage_type = "local";
|
||||
storage_config.root_path = TestRemotePath;
|
||||
return storage_config;
|
||||
}
|
||||
|
||||
StorageConfig
|
||||
get_default_remote_storage_config() {
|
||||
char testPath[100];
|
||||
auto pwd = std::string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!find_file(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
auto configPath = filepath.string();
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<std::string>();
|
||||
auto port = minioConfig["port"].as<std::string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<std::string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<std::string>();
|
||||
auto rootPath = minioConfig["rootPath"].as<std::string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto useIam = minioConfig["useIAM"].as<bool>();
|
||||
auto iamEndPoint = minioConfig["iamEndpoint"].as<std::string>();
|
||||
auto logLevel = minioConfig["logLevel"].as<std::string>();
|
||||
auto bucketName = minioConfig["bucketName"].as<std::string>();
|
||||
auto useVirHost = minioConfig["useVirtualHost"].as<bool>();
|
||||
auto region = minioConfig["region"].as<std::string>();
|
||||
|
||||
return StorageConfig{endpoint,
|
||||
bucketName,
|
||||
accessKey,
|
||||
accessValue,
|
||||
rootPath,
|
||||
"minio",
|
||||
iamEndPoint,
|
||||
logLevel,
|
||||
region,
|
||||
useSSL,
|
||||
useIam,
|
||||
useVirHost};
|
||||
}
|
||||
|
||||
void
|
||||
delete_cstorage_config(CStorageConfig config) {
|
||||
delete[] config.address;
|
||||
delete[] config.bucket_name;
|
||||
delete[] config.access_key_id;
|
||||
delete[] config.access_key_value;
|
||||
delete[] config.root_path;
|
||||
delete[] config.storage_type;
|
||||
delete[] config.iam_endpoint;
|
||||
delete[] config.region;
|
||||
}
|
||||
|
||||
class TestConfigWrapper {
|
||||
public:
|
||||
TestConfigWrapper() = default;
|
||||
|
||||
TestConfigWrapper(const TestConfigWrapper&) = delete;
|
||||
|
||||
TestConfigWrapper
|
||||
operator=(const TestConfigWrapper&) = delete;
|
||||
|
||||
~TestConfigWrapper() {
|
||||
delete_cstorage_config(config_);
|
||||
}
|
||||
|
||||
public:
|
||||
static TestConfigWrapper&
|
||||
GetInstance() {
|
||||
// thread-safe enough after c++11
|
||||
static TestConfigWrapper instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
CStorageConfig
|
||||
get_default_cstorage_config() {
|
||||
auto init = [&] { this->init_default_cstorage_config(); };
|
||||
call_once(once_, init);
|
||||
return config_;
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
init_default_cstorage_config() {
|
||||
char testPath[1000];
|
||||
auto pwd = std::string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!find_file(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
auto configPath = filepath.string();
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<std::string>();
|
||||
auto port = minioConfig["port"].as<std::string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<std::string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<std::string>();
|
||||
auto rootPath = minioConfig["rootPath"].as<std::string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto useIam = minioConfig["useIAM"].as<bool>();
|
||||
auto iamEndPoint = minioConfig["iamEndpoint"].as<std::string>();
|
||||
auto bucketName = minioConfig["bucketName"].as<std::string>();
|
||||
std::string storage_type = "minio";
|
||||
|
||||
config_.address = new char[endpoint.length() + 1];
|
||||
config_.bucket_name = new char[bucketName.length() + 1];
|
||||
config_.access_key_id = new char[accessKey.length() + 1];
|
||||
config_.access_key_value = new char[accessValue.length() + 1];
|
||||
config_.root_path = new char[rootPath.length() + 1];
|
||||
config_.storage_type = new char[storage_type.length() + 1];
|
||||
config_.iam_endpoint = new char[iamEndPoint.length() + 1];
|
||||
config_.useSSL = useSSL;
|
||||
config_.useIAM = useIam;
|
||||
|
||||
strcpy(const_cast<char*>(config_.address), endpoint.c_str());
|
||||
strcpy(const_cast<char*>(config_.bucket_name), bucketName.c_str());
|
||||
strcpy(const_cast<char*>(config_.access_key_id), accessKey.c_str());
|
||||
strcpy(const_cast<char*>(config_.access_key_value),
|
||||
accessValue.c_str());
|
||||
strcpy(const_cast<char*>(config_.root_path), rootPath.c_str());
|
||||
strcpy(const_cast<char*>(config_.storage_type), storage_type.c_str());
|
||||
strcpy(const_cast<char*>(config_.iam_endpoint), iamEndPoint.c_str());
|
||||
}
|
||||
|
||||
private:
|
||||
CStorageConfig config_;
|
||||
std::once_flag once_;
|
||||
};
|
||||
|
||||
CStorageConfig
|
||||
get_default_cstorage_config() {
|
||||
return TestConfigWrapper::GetInstance().get_default_cstorage_config();
|
||||
}
|
||||
|
||||
auto
|
||||
generate_build_conf(const milvus::IndexType& index_type,
|
||||
const milvus::MetricType& metric_type) {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Constants.h"
|
||||
#include "DataGen.h"
|
||||
#include "common/Types.h"
|
||||
#include "common/LoadInfo.h"
|
||||
#include "storage/Types.h"
|
||||
#include "storage/InsertData.h"
|
||||
|
||||
using milvus::DataType;
|
||||
using milvus::FieldId;
|
||||
using milvus::segcore::GeneratedData;
|
||||
using milvus::storage::ChunkManagerPtr;
|
||||
using milvus::storage::FieldDataMeta;
|
||||
using milvus::storage::FieldDataPtr;
|
||||
using milvus::storage::InsertData;
|
||||
using milvus::storage::StorageConfig;
|
||||
|
||||
namespace {
|
||||
|
||||
// test remote chunk manager with local disk
|
||||
inline StorageConfig
|
||||
get_default_local_storage_config() {
|
||||
StorageConfig storage_config;
|
||||
storage_config.storage_type = "local";
|
||||
storage_config.root_path = TestRemotePath;
|
||||
return storage_config;
|
||||
}
|
||||
|
||||
inline LoadFieldDataInfo
|
||||
PrepareInsertBinlog(int64_t collection_id,
|
||||
int64_t partition_id,
|
||||
int64_t segment_id,
|
||||
const std::string& prefix,
|
||||
const GeneratedData& dataset,
|
||||
const ChunkManagerPtr cm) {
|
||||
LoadFieldDataInfo load_info;
|
||||
auto row_count = dataset.row_ids_.size();
|
||||
|
||||
auto SaveFieldData = [&](const FieldDataPtr field_data,
|
||||
const std::string& file,
|
||||
const int64_t field_id) {
|
||||
auto insert_data = std::make_shared<InsertData>(field_data);
|
||||
FieldDataMeta field_data_meta{
|
||||
collection_id, partition_id, segment_id, field_id};
|
||||
insert_data->SetFieldDataMeta(field_data_meta);
|
||||
auto serialized_insert_data = insert_data->serialize_to_remote_file();
|
||||
auto serialized_insert_size = serialized_insert_data.size();
|
||||
cm->Write(file, serialized_insert_data.data(), serialized_insert_size);
|
||||
|
||||
load_info.field_infos.emplace(
|
||||
field_id,
|
||||
FieldBinlogInfo{field_id,
|
||||
static_cast<int64_t>(row_count),
|
||||
std::vector<std::string>{file}});
|
||||
};
|
||||
|
||||
{
|
||||
auto field_data = std::make_shared<milvus::storage::FieldData<int64_t>>(
|
||||
DataType::INT64);
|
||||
field_data->FillFieldData(dataset.row_ids_.data(), row_count);
|
||||
auto path = prefix + "/" + std::to_string(RowFieldID.get());
|
||||
SaveFieldData(field_data, path, RowFieldID.get());
|
||||
}
|
||||
{
|
||||
auto field_data = std::make_shared<milvus::storage::FieldData<int64_t>>(
|
||||
DataType::INT64);
|
||||
field_data->FillFieldData(dataset.timestamps_.data(), row_count);
|
||||
auto path = prefix + "/" + std::to_string(TimestampFieldID.get());
|
||||
SaveFieldData(field_data, path, TimestampFieldID.get());
|
||||
}
|
||||
auto fields = dataset.schema_->get_fields();
|
||||
for (auto& data : dataset.raw_->fields_data()) {
|
||||
int64_t field_id = data.field_id();
|
||||
auto field_meta = fields.at(FieldId(field_id));
|
||||
auto field_data = milvus::segcore::CreateFieldDataFromDataArray(
|
||||
row_count, &data, field_meta);
|
||||
auto path = prefix + "/" + std::to_string(field_id);
|
||||
SaveFieldData(field_data, path, field_id);
|
||||
}
|
||||
|
||||
return load_info;
|
||||
}
|
||||
|
||||
} // namespace
|
Loading…
Reference in New Issue