enhance: add documents in batch for json key stats (#40898)

issue: #40897

After this, the document add operations scheduling duration is
**decreased** roughly from **6s to 0.9s** for the case in the issue.

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
pull/41052/head
Spade A 2025-04-02 08:22:24 +08:00 committed by GitHub
parent c32259ead7
commit 28c1ab8a16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 358 additions and 26 deletions

View File

@ -58,11 +58,18 @@ JsonKeyStatsInvertedIndex::AddJSONEncodeValue(
void
JsonKeyStatsInvertedIndex::AddInvertedRecord(
std::map<std::string, std::vector<int64_t>>& mp) {
std::vector<uintptr_t> json_offsets_lens;
std::vector<const char*> keys;
std::vector<const int64_t*> json_offsets;
for (auto& iter : mp) {
for (auto value : iter.second) {
wrapper_->add_multi_data<std::string>(&iter.first, 1, value);
}
keys.push_back(iter.first.c_str());
json_offsets.push_back(iter.second.data());
json_offsets_lens.push_back(iter.second.size());
}
wrapper_->add_json_key_stats_data_by_batch(keys.data(),
json_offsets.data(),
json_offsets_lens.data(),
keys.size());
}
void
@ -305,6 +312,20 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
IndexStatsPtr
JsonKeyStatsInvertedIndex::Upload(const Config& config) {
finish();
index_build_timestamps_.index_build_done_ =
std::chrono::system_clock::now();
LOG_INFO(
"build json key index done for field id:{}, json parse duration: {}s, "
"tantivy document add schedule duration : {}s, "
"tantivy total duration : {}s, "
"total duration : {}s",
field_id_,
index_build_timestamps_.getJsonParsingDuration(),
index_build_timestamps_.getTantivyAddSchedulingDuration(),
index_build_timestamps_.getTantivyTotalDuration(),
index_build_timestamps_.getIndexBuildTotalDuration());
boost::filesystem::path p(path_);
boost::filesystem::directory_iterator end_iter;
@ -384,6 +405,8 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData(
const std::vector<FieldDataPtr>& field_datas, bool nullable) {
int64_t offset = 0;
std::map<std::string, std::vector<int64_t>> mp;
index_build_timestamps_.index_build_begin_ =
std::chrono::system_clock::now();
if (nullable) {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
@ -410,8 +433,13 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData(
}
}
}
index_build_timestamps_.tantivy_build_begin_ =
std::chrono::system_clock::now();
// Schedule all document add operations to tantivy.
AddInvertedRecord(mp);
LOG_INFO("build json key index done for field id:{}", field_id_);
index_build_timestamps_.tantivy_add_schedule_end_ =
std::chrono::system_clock::now();
}
void

View File

@ -293,5 +293,44 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy<std::string> {
std::atomic<stdclock::time_point> last_commit_time_;
int64_t commit_interval_in_ms_;
std::atomic<bool> is_data_uncommitted_ = false;
struct IndexBuildTimestamps {
std::chrono::time_point<std::chrono::system_clock> index_build_begin_;
std::chrono::time_point<std::chrono::system_clock> tantivy_build_begin_;
// The time that we have finished push add operations to tantivy, which will be
// executed asynchronously.
std::chrono::time_point<std::chrono::system_clock>
tantivy_add_schedule_end_;
std::chrono::time_point<std::chrono::system_clock> index_build_done_;
auto
getJsonParsingDuration() const {
return std::chrono::duration<double>(tantivy_build_begin_ -
index_build_begin_)
.count();
}
auto
getTantivyAddSchedulingDuration() const {
return std::chrono::duration<double>(tantivy_add_schedule_end_ -
tantivy_build_begin_)
.count();
}
auto
getTantivyTotalDuration() const {
return std::chrono::duration<double>(index_build_done_ -
tantivy_build_begin_)
.count();
}
auto
getIndexBuildTotalDuration() const {
return std::chrono::duration<double>(index_build_done_ -
index_build_begin_)
.count();
}
};
IndexBuildTimestamps index_build_timestamps_;
};
} // namespace milvus::index

View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "addr2line"
@ -379,12 +379,12 @@ dependencies = [
[[package]]
name = "errno"
version = "0.3.9"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@ -432,10 +432,16 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eeb4ed9e12f43b7fa0baae3f9cdda28352770132ef2e09a23760c29cae8bd47"
dependencies = [
"rustix",
"rustix 0.38.41",
"windows-sys 0.48.0",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "futures"
version = "0.3.31"
@ -542,7 +548,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
]
[[package]]
@ -690,9 +708,9 @@ checksum = "0c2cdeb66e45e9f36bfad5bbdb4d2384e70936afbee843c6f6543f0c551ebb25"
[[package]]
name = "libc"
version = "0.2.164"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "linux-raw-sys"
@ -700,6 +718,12 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "linux-raw-sys"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413"
[[package]]
name = "log"
version = "0.4.22"
@ -860,7 +884,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0"
dependencies = [
"phf_shared",
"rand",
"rand 0.8.5",
]
[[package]]
@ -914,15 +938,59 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "rand"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c"
dependencies = [
"libc",
"rand 0.4.6",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"rand_core",
"rand_core 0.6.4",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.6.4"
@ -949,6 +1017,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "regex"
version = "1.11.1"
@ -1015,10 +1092,23 @@ dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.4.14",
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96"
dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys 0.9.3",
"windows-sys 0.59.0",
]
[[package]]
name = "ryu"
version = "1.0.18"
@ -1200,10 +1290,12 @@ dependencies = [
"lazy_static",
"libc",
"log",
"rand 0.3.23",
"regex",
"scopeguard",
"serde_json",
"tantivy",
"tempfile",
"zstd-sys",
]
@ -1290,14 +1382,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.14.0"
version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
dependencies = [
"cfg-if",
"fastrand",
"getrandom 0.3.2",
"once_cell",
"rustix",
"rustix 1.0.3",
"windows-sys 0.59.0",
]
@ -1410,7 +1502,7 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
"getrandom 0.2.15",
"serde",
]
@ -1426,6 +1518,15 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.14.2+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.95"
@ -1670,6 +1771,15 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "zerocopy"
version = "0.7.35"

View File

@ -19,6 +19,10 @@ jieba-rs = "0.6.8"
regex = "1.11.1"
either = "1.13.0"
[dev-dependencies]
rand = "0.3"
tempfile = "3.19.1"
[build-dependencies]
cbindgen = "0.26.0"

View File

@ -245,6 +245,12 @@ RustResult tantivy_index_add_bools(void *ptr,
uintptr_t len,
int64_t offset_begin);
RustResult tantivy_index_add_json_key_stats_data_by_batch(void *ptr,
const char *const *keys,
const int64_t *const *json_offsets,
const uintptr_t *json_offsets_len,
uintptr_t len);
RustResult tantivy_index_add_bools_by_single_segment_writer(void *ptr,
const bool *array,
uintptr_t len);

View File

@ -320,8 +320,8 @@ mod test {
use tantivy::{
doc,
schema::{self, Schema, STORED, STRING, TEXT},
Index, IndexWriter,
schema::{Schema, STORED, STRING},
Index,
};
use super::IndexReaderWrapper;

View File

@ -8,14 +8,16 @@ use log::info;
use tantivy::schema::{
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
};
use tantivy::{doc, Document, Index, IndexWriter, SingleSegmentIndexWriter};
use tantivy::{doc, Document, Index, IndexWriter, SingleSegmentIndexWriter, UserOperation};
use crate::data_type::TantivyDataType;
use crate::error::Result;
use crate::error::{Result, TantivyBindingError};
use crate::index_reader::IndexReaderWrapper;
use crate::log::init_log;
const BATCH_SIZE: usize = 4096;
pub(crate) struct IndexWriterWrapper {
pub(crate) field: Field,
pub(crate) index_writer: Either<IndexWriter, SingleSegmentIndexWriter>,
@ -56,7 +58,10 @@ impl IndexWriterWrapper {
in_ram: bool,
) -> Result<IndexWriterWrapper> {
init_log();
info!("create index writer, field_name: {}, data_type: {:?}", field_name, data_type);
info!(
"create index writer, field_name: {}, data_type: {:?}, num threads {}",
field_name, data_type, num_threads
);
let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, &field_name, data_type);
// We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field.
@ -83,7 +88,10 @@ impl IndexWriterWrapper {
path: String,
) -> Result<IndexWriterWrapper> {
init_log();
info!("create single segment index writer, field_name: {}, data_type: {:?}", field_name, data_type);
info!(
"create single segment index writer, field_name: {}, data_type: {:?}",
field_name, data_type
);
let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, &field_name, data_type);
let schema = schema_builder.build();
@ -169,6 +177,50 @@ impl IndexWriterWrapper {
))
}
// add in batch within BATCH_SIZE
pub fn add_json_key_stats(
&mut self,
keys: &[*const i8],
json_offsets: &[*const i64],
json_offsets_len: &[usize],
) -> Result<()> {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
keys.iter()
.zip(json_offsets.iter())
.zip(json_offsets_len.iter())
.try_for_each(|((key, json_offsets), json_offsets_len)| -> Result<()> {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let json_offsets =
unsafe { std::slice::from_raw_parts(*json_offsets, *json_offsets_len) };
for offset in json_offsets {
batch.push(UserOperation::Add(doc!(
id_field => *offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
Ok(())
})?;
if !batch.is_empty() {
writer.run(batch)?;
}
Ok(())
}
pub fn add_multi_i8s(&mut self, datas: &[i8], offset: i64) -> Result<()> {
let mut document = Document::default();
for data in datas {
@ -383,3 +435,63 @@ impl IndexWriterWrapper {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::ffi::CString;
use rand::Rng;
use tempfile::tempdir;
#[test]
pub fn test_add_json_key_stats() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
"test".to_string(),
TantivyDataType::Keyword,
temp_dir.path().to_str().unwrap().to_string(),
1,
15 * 1024 * 1024,
false,
)
.unwrap();
let keys = (0..100).map(|i| format!("key{:05}", i)).collect::<Vec<_>>();
let mut total_count = 0;
let mut rng = rand::thread_rng();
let json_offsets = (0..100)
.map(|_| {
let count = rng.gen_range(0, 1000);
total_count += count;
(0..count)
.map(|_| rng.gen_range(0, i64::MAX))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let json_offsets_len = json_offsets
.iter()
.map(|offsets| offsets.len())
.collect::<Vec<_>>();
let json_offsets = json_offsets.iter().map(|x| x.as_ptr()).collect::<Vec<_>>();
let c_keys: Vec<CString> = keys.into_iter().map(|k| CString::new(k).unwrap()).collect();
let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect();
index_writer
.add_json_key_stats(&key_ptrs, &json_offsets, &json_offsets_len)
.unwrap();
index_writer.commit().unwrap();
let count: u32 = index_writer
.index
.load_metas()
.unwrap()
.segments
.iter()
.map(|s| s.max_doc())
.sum();
assert_eq!(count, total_count);
}
}

View File

@ -317,6 +317,25 @@ pub extern "C" fn tantivy_index_add_bools(
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_json_key_stats_data_by_batch(
ptr: *mut c_void,
keys: *const *const c_char,
json_offsets: *const *const i64,
json_offsets_len: *const usize,
len: usize,
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let json_offsets_len = unsafe { slice::from_raw_parts(json_offsets_len, len) };
let json_offsets = unsafe { slice::from_raw_parts(json_offsets, len) };
let keys = unsafe { slice::from_raw_parts(keys, len) };
unsafe {
(*real)
.add_json_key_stats(keys, json_offsets, json_offsets_len)
.into()
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_bools_by_single_segment_writer(
ptr: *mut c_void,

View File

@ -263,6 +263,20 @@ struct TantivyIndexWrapper {
typeid(T).name());
}
void
add_json_key_stats_data_by_batch(const char* const* keys,
const int64_t* const* json_offsets,
const uintptr_t* json_offsets_lens,
uintptr_t len_of_lens) {
assert(!finished_);
auto res =
RustResultWrapper(tantivy_index_add_json_key_stats_data_by_batch(
writer_, keys, json_offsets, json_offsets_lens, len_of_lens));
AssertInfo(res.result_->success,
"failed to add json key stats: {}",
res.result_->error);
}
template <typename T>
void
add_multi_data(const T* array, uintptr_t len, int64_t offset) {