mirror of https://github.com/milvus-io/milvus.git
fix: remove single segment logic in V7 (#41159)
Ref: https://github.com/milvus-io/milvus/issues/40823 It does not make any sense to create single segment tantivy index for old version such as 2.4 by using tantivy V7. So, clean the relevant code. --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>pull/41191/head
parent
bbaa3c71f4
commit
e9fa30f462
|
@ -184,8 +184,7 @@ RustResult tantivy_create_index(const char *field_name,
|
|||
|
||||
RustResult tantivy_create_index_with_single_segment(const char *field_name,
|
||||
TantivyDataType data_type,
|
||||
const char *path,
|
||||
uint32_t tantivy_index_version);
|
||||
const char *path);
|
||||
|
||||
void tantivy_free_index_writer(void *ptr);
|
||||
|
||||
|
|
|
@ -59,23 +59,12 @@ impl IndexWriterWrapper {
|
|||
field_name: &str,
|
||||
data_type: TantivyDataType,
|
||||
path: String,
|
||||
tanviy_index_version: TantivyIndexVersion,
|
||||
) -> Result<IndexWriterWrapper> {
|
||||
init_log();
|
||||
match tanviy_index_version {
|
||||
TantivyIndexVersion::V5 => {
|
||||
let writer = index_writer_v5::IndexWriterWrapperImpl::new_with_single_segment(
|
||||
field_name, data_type, path,
|
||||
)?;
|
||||
Ok(IndexWriterWrapper::V5(writer))
|
||||
}
|
||||
TantivyIndexVersion::V7 => {
|
||||
let writer = index_writer_v7::IndexWriterWrapperImpl::new_with_single_segment(
|
||||
field_name, data_type, path,
|
||||
)?;
|
||||
Ok(IndexWriterWrapper::V7(writer))
|
||||
}
|
||||
}
|
||||
let writer = index_writer_v5::IndexWriterWrapperImpl::new_with_single_segment(
|
||||
field_name, data_type, path,
|
||||
)?;
|
||||
Ok(IndexWriterWrapper::V5(writer))
|
||||
}
|
||||
|
||||
pub fn create_reader(&self) -> Result<IndexReaderWrapper> {
|
||||
|
@ -97,7 +86,7 @@ impl IndexWriterWrapper {
|
|||
{
|
||||
match self {
|
||||
IndexWriterWrapper::V5(writer) => writer.add_data_by_batch(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_data_by_batch(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_data_by_batch(data, offset.unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,7 +97,7 @@ impl IndexWriterWrapper {
|
|||
{
|
||||
match self {
|
||||
IndexWriterWrapper::V5(writer) => writer.add_array(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_array(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_array(data, offset.unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,7 +108,7 @@ impl IndexWriterWrapper {
|
|||
) -> Result<()> {
|
||||
match self {
|
||||
IndexWriterWrapper::V5(writer) => writer.add_string_by_batch(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_string_by_batch(data, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_string_by_batch(data, offset.unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,7 +119,7 @@ impl IndexWriterWrapper {
|
|||
) -> Result<()> {
|
||||
match self {
|
||||
IndexWriterWrapper::V5(writer) => writer.add_array_keywords(datas, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_array_keywords(datas, offset),
|
||||
IndexWriterWrapper::V7(writer) => writer.add_array_keywords(datas, offset.unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,7 +212,6 @@ mod tests {
|
|||
field_name,
|
||||
data_type,
|
||||
dir.path().to_str().unwrap().to_string(),
|
||||
TantivyIndexVersion::V5,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -55,21 +55,14 @@ pub extern "C" fn tantivy_create_index_with_single_segment(
|
|||
field_name: *const c_char,
|
||||
data_type: TantivyDataType,
|
||||
path: *const c_char,
|
||||
tantivy_index_version: u32,
|
||||
) -> RustResult {
|
||||
let field_name_str = cstr_to_str!(field_name);
|
||||
let path_str = cstr_to_str!(path);
|
||||
|
||||
let tantivy_index_version = match TantivyIndexVersion::from_u32(tantivy_index_version) {
|
||||
Ok(v) => v,
|
||||
Err(e) => return RustResult::from_error(e.to_string()),
|
||||
};
|
||||
|
||||
match IndexWriterWrapper::new_with_single_segment(
|
||||
field_name_str,
|
||||
data_type,
|
||||
String::from(path_str),
|
||||
tantivy_index_version,
|
||||
) {
|
||||
Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)),
|
||||
Err(e) => RustResult::from_error(e.to_string()),
|
||||
|
|
|
@ -37,9 +37,9 @@ impl AnalyzerBuilder<'_> {
|
|||
return Ok(tokenizer.unwrap());
|
||||
}
|
||||
|
||||
Err(TantivyBindingError::InternalError(format!(
|
||||
"tokenizer name should be string or dict"
|
||||
)))
|
||||
Err(TantivyBindingError::InternalError(
|
||||
"tokenizer name should be string or dict".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn add_custom_filter(
|
||||
|
|
|
@ -89,7 +89,7 @@ fn get_decompounder_filter(params: &json::Map<String, json::Value>) -> Result<Sy
|
|||
Ok(f) => Ok(SystemFilter::Decompounder(f)),
|
||||
Err(e) => Err(TantivyBindingError::InternalError(format!(
|
||||
"create decompounder failed: {}",
|
||||
e.to_string()
|
||||
e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ fn get_stemmer_filter(params: &json::Map<String, json::Value>) -> Result<SystemF
|
|||
Ok(language) => Ok(SystemFilter::Stemmer(Stemmer::new(language))),
|
||||
Err(e) => Err(TantivyBindingError::InternalError(format!(
|
||||
"create stemmer failed : {}",
|
||||
e.to_string()
|
||||
e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,17 +106,15 @@ fn fetch_lindera_kind(params: &json::Map<String, json::Value>) -> Result<Diction
|
|||
match params.get("dict_kind") {
|
||||
Some(val) => {
|
||||
if !val.is_string() {
|
||||
return Err(TantivyBindingError::InvalidArgument(format!(
|
||||
"lindera tokenizer dict kind should be string"
|
||||
)));
|
||||
return Err(TantivyBindingError::InvalidArgument(
|
||||
"lindera tokenizer dict kind should be string".to_string(),
|
||||
));
|
||||
}
|
||||
val.as_str().unwrap().into_dict_kind()
|
||||
}
|
||||
_ => {
|
||||
return Err(TantivyBindingError::InvalidArgument(format!(
|
||||
"lindera tokenizer dict_kind must be set"
|
||||
)))
|
||||
}
|
||||
_ => Err(TantivyBindingError::InvalidArgument(
|
||||
"lindera tokenizer dict_kind must be set".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,9 +24,9 @@ pub fn lindera_builder(
|
|||
params: Option<&json::Map<String, json::Value>>,
|
||||
) -> Result<TextAnalyzerBuilder> {
|
||||
if params.is_none() {
|
||||
return Err(TantivyBindingError::InvalidArgument(format!(
|
||||
"lindera tokenizer must be costum"
|
||||
)));
|
||||
return Err(TantivyBindingError::InvalidArgument(
|
||||
"lindera tokenizer must be costum".to_string(),
|
||||
));
|
||||
}
|
||||
let tokenizer = LinderaTokenizer::from_json(params.unwrap())?;
|
||||
Ok(TextAnalyzer::builder(tokenizer).dynamic())
|
||||
|
@ -43,16 +43,16 @@ pub fn get_builder_with_tokenizer(params: &json::Value) -> Result<TextAnalyzerBu
|
|||
match m.get("type") {
|
||||
Some(val) => {
|
||||
if !val.is_string() {
|
||||
return Err(TantivyBindingError::InvalidArgument(format!(
|
||||
"tokenizer type should be string"
|
||||
)));
|
||||
return Err(TantivyBindingError::InvalidArgument(
|
||||
"tokenizer type should be string".to_string(),
|
||||
));
|
||||
}
|
||||
name = val.as_str().unwrap();
|
||||
}
|
||||
_ => {
|
||||
return Err(TantivyBindingError::InvalidArgument(format!(
|
||||
"costum tokenizer must set type"
|
||||
)))
|
||||
return Err(TantivyBindingError::InvalidArgument(
|
||||
"costum tokenizer must set type".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
params_map = Some(m);
|
||||
|
|
|
@ -16,7 +16,6 @@ use crate::data_type::TantivyDataType;
|
|||
|
||||
use crate::error::{Result, TantivyBindingError};
|
||||
use crate::index_writer::TantivyValue;
|
||||
use crate::log::init_log;
|
||||
|
||||
const BATCH_SIZE: usize = 4096;
|
||||
|
||||
|
@ -42,7 +41,7 @@ fn schema_builder_add_field(
|
|||
.set_tokenizer("raw")
|
||||
.set_index_option(IndexRecordOption::Basic);
|
||||
let text_options = TextOptions::default().set_indexing_options(text_field_indexing);
|
||||
schema_builder.add_text_field(&field_name, text_options)
|
||||
schema_builder.add_text_field(field_name, text_options)
|
||||
}
|
||||
TantivyDataType::Text => {
|
||||
panic!("text should be indexed with analyzer");
|
||||
|
@ -131,7 +130,6 @@ impl IndexWriterWrapperImpl {
|
|||
data_type: TantivyDataType,
|
||||
path: String,
|
||||
) -> Result<IndexWriterWrapperImpl> {
|
||||
init_log();
|
||||
info!(
|
||||
"create single segment index writer, field_name: {}, data_type: {:?}, tantivy_index_version 5",
|
||||
field_name, data_type
|
||||
|
@ -157,10 +155,10 @@ impl IndexWriterWrapperImpl {
|
|||
|
||||
match &mut self.index_writer {
|
||||
Either::Left(writer) => {
|
||||
let _ = writer.add_document(document)?;
|
||||
writer.add_document(document)?;
|
||||
}
|
||||
Either::Right(single_segment_writer) => {
|
||||
let _ = single_segment_writer.add_document(document)?;
|
||||
single_segment_writer.add_document(document)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -37,13 +37,12 @@ impl IndexWriterWrapperImpl {
|
|||
let tokenizer = create_analyzer(tokenizer_params)?;
|
||||
|
||||
let (schema, field, id_field) = build_text_schema(field_name, tokenizer_name);
|
||||
let index: Index;
|
||||
if in_ram {
|
||||
index = Index::create_in_ram(schema);
|
||||
let index = if in_ram {
|
||||
Index::create_in_ram(schema)
|
||||
} else {
|
||||
index = Index::create_in_dir(path.to_string(), schema).unwrap();
|
||||
}
|
||||
index.tokenizers().register(&tokenizer_name, tokenizer);
|
||||
Index::create_in_dir(path, schema).unwrap()
|
||||
};
|
||||
index.tokenizers().register(tokenizer_name, tokenizer);
|
||||
let index_writer = index
|
||||
.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)
|
||||
.unwrap();
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use std::ffi::CStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use either::Either;
|
||||
use futures::executor::block_on;
|
||||
use libc::c_char;
|
||||
use log::info;
|
||||
|
@ -9,14 +8,13 @@ use tantivy::indexer::UserOperation;
|
|||
use tantivy::schema::{
|
||||
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
|
||||
};
|
||||
use tantivy::{doc, Index, IndexWriter, SingleSegmentIndexWriter, TantivyDocument};
|
||||
use tantivy::{doc, Index, IndexWriter, TantivyDocument};
|
||||
|
||||
use crate::data_type::TantivyDataType;
|
||||
|
||||
use crate::error::{Result, TantivyBindingError};
|
||||
use crate::index_reader::IndexReaderWrapper;
|
||||
use crate::index_writer::TantivyValue;
|
||||
use crate::log::init_log;
|
||||
|
||||
const BATCH_SIZE: usize = 4096;
|
||||
|
||||
|
@ -35,7 +33,7 @@ fn schema_builder_add_field(
|
|||
.set_tokenizer("raw")
|
||||
.set_index_option(IndexRecordOption::Basic);
|
||||
let text_options = TextOptions::default().set_indexing_options(text_field_indexing);
|
||||
schema_builder.add_text_field(&field_name, text_options)
|
||||
schema_builder.add_text_field(field_name, text_options)
|
||||
}
|
||||
TantivyDataType::Text => {
|
||||
panic!("text should be indexed with analyzer");
|
||||
|
@ -93,8 +91,8 @@ impl TantivyValue<TantivyDocument> for bool {
|
|||
|
||||
pub struct IndexWriterWrapperImpl {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) index_writer: Either<IndexWriter, SingleSegmentIndexWriter>,
|
||||
pub(crate) id_field: Option<Field>,
|
||||
pub(crate) index_writer: IndexWriter,
|
||||
pub(crate) id_field: Field,
|
||||
pub(crate) index: Arc<Index>,
|
||||
}
|
||||
|
||||
|
@ -120,31 +118,8 @@ impl IndexWriterWrapperImpl {
|
|||
index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?;
|
||||
Ok(IndexWriterWrapperImpl {
|
||||
field,
|
||||
index_writer: Either::Left(index_writer),
|
||||
id_field: Some(id_field),
|
||||
index: Arc::new(index),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_with_single_segment(
|
||||
field_name: &str,
|
||||
data_type: TantivyDataType,
|
||||
path: String,
|
||||
) -> Result<IndexWriterWrapperImpl> {
|
||||
init_log();
|
||||
info!(
|
||||
"create single segment index writer, field_name: {}, data_type: {:?}, tantivy_index_version 7",
|
||||
field_name, data_type
|
||||
);
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder_add_field(&mut schema_builder, field_name, data_type);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_dir(path.clone(), schema)?;
|
||||
let index_writer = SingleSegmentIndexWriter::new(index.clone(), 15 * 1024 * 1024)?;
|
||||
Ok(IndexWriterWrapperImpl {
|
||||
field,
|
||||
index_writer: Either::Right(index_writer),
|
||||
id_field: None,
|
||||
index_writer,
|
||||
id_field,
|
||||
index: Arc::new(index),
|
||||
})
|
||||
}
|
||||
|
@ -154,51 +129,28 @@ impl IndexWriterWrapperImpl {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
fn add_document(&mut self, mut document: TantivyDocument, offset: Option<i64>) -> Result<()> {
|
||||
if let Some(id_field) = self.id_field {
|
||||
document.add_i64(id_field, offset.unwrap());
|
||||
}
|
||||
|
||||
match &mut self.index_writer {
|
||||
Either::Left(writer) => {
|
||||
let _ = writer.add_document(document)?;
|
||||
}
|
||||
Either::Right(single_segment_writer) => {
|
||||
let _ = single_segment_writer.add_document(document)?;
|
||||
}
|
||||
}
|
||||
fn add_document(&mut self, mut document: TantivyDocument, offset: i64) -> Result<()> {
|
||||
document.add_i64(self.id_field, offset);
|
||||
self.index_writer.add_document(document)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_data_by_batch<T: TantivyValue<TantivyDocument>>(
|
||||
&mut self,
|
||||
batch_data: &[T],
|
||||
offset: Option<i64>,
|
||||
) -> Result<()> {
|
||||
match &self.index_writer {
|
||||
Either::Left(_) => self.add_datas(batch_data, offset.unwrap()),
|
||||
Either::Right(_) => self.add_datas_by_single_segment(batch_data),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_datas<T: TantivyValue<TantivyDocument>>(
|
||||
&mut self,
|
||||
batch_data: &[T],
|
||||
offset_begin: i64,
|
||||
) -> Result<()> {
|
||||
let writer = self.index_writer.as_ref().left().unwrap();
|
||||
let id_field = self.id_field.unwrap();
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
for (idx, data) in batch_data.into_iter().enumerate() {
|
||||
let offset = offset_begin + idx as i64;
|
||||
|
||||
let mut doc = TantivyDocument::default();
|
||||
data.add_to_document(self.field.field_id(), &mut doc);
|
||||
doc.add_i64(id_field, offset);
|
||||
doc.add_i64(self.id_field, offset);
|
||||
|
||||
batch.push(UserOperation::Add(doc));
|
||||
if batch.len() == BATCH_SIZE {
|
||||
writer.run(std::mem::replace(
|
||||
self.index_writer.run(std::mem::replace(
|
||||
&mut batch,
|
||||
Vec::with_capacity(BATCH_SIZE),
|
||||
))?;
|
||||
|
@ -206,28 +158,16 @@ impl IndexWriterWrapperImpl {
|
|||
}
|
||||
|
||||
if !batch.is_empty() {
|
||||
writer.run(batch)?;
|
||||
self.index_writer.run(batch)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_datas_by_single_segment<T: TantivyValue<TantivyDocument>>(
|
||||
&mut self,
|
||||
batch_data: &[T],
|
||||
) -> Result<()> {
|
||||
for d in batch_data {
|
||||
let mut document = TantivyDocument::default();
|
||||
d.add_to_document(self.field.field_id(), &mut document);
|
||||
self.add_document(document, None)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_array<T: TantivyValue<TantivyDocument>, I>(
|
||||
&mut self,
|
||||
data: I,
|
||||
offset: Option<i64>,
|
||||
offset: i64,
|
||||
) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
|
@ -239,11 +179,7 @@ impl IndexWriterWrapperImpl {
|
|||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_array_keywords(
|
||||
&mut self,
|
||||
datas: &[*const c_char],
|
||||
offset: Option<i64>,
|
||||
) -> Result<()> {
|
||||
pub fn add_array_keywords(&mut self, datas: &[*const c_char], offset: i64) -> Result<()> {
|
||||
let mut document = TantivyDocument::default();
|
||||
for element in datas {
|
||||
let data = unsafe { CStr::from_ptr(*element) };
|
||||
|
@ -253,20 +189,7 @@ impl IndexWriterWrapperImpl {
|
|||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_string_by_batch(
|
||||
&mut self,
|
||||
data: &[*const c_char],
|
||||
offset: Option<i64>,
|
||||
) -> Result<()> {
|
||||
match &self.index_writer {
|
||||
Either::Left(_) => self.add_strings(data, offset.unwrap()),
|
||||
Either::Right(_) => self.add_strings_by_single_segment(data),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_strings(&mut self, data: &[*const c_char], offset: i64) -> Result<()> {
|
||||
let writer = self.index_writer.as_ref().left().unwrap();
|
||||
let id_field = self.id_field.unwrap();
|
||||
pub fn add_string_by_batch(&mut self, data: &[*const c_char], offset: i64) -> Result<()> {
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
for (idx, key) in data.into_iter().enumerate() {
|
||||
let key = unsafe { CStr::from_ptr(*key) }
|
||||
|
@ -274,11 +197,11 @@ impl IndexWriterWrapperImpl {
|
|||
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
|
||||
let key_offset = offset + idx as i64;
|
||||
batch.push(UserOperation::Add(doc!(
|
||||
id_field => key_offset,
|
||||
self.id_field => key_offset,
|
||||
self.field => key,
|
||||
)));
|
||||
if batch.len() >= BATCH_SIZE {
|
||||
writer.run(std::mem::replace(
|
||||
self.index_writer.run(std::mem::replace(
|
||||
&mut batch,
|
||||
Vec::with_capacity(BATCH_SIZE),
|
||||
))?;
|
||||
|
@ -286,53 +209,32 @@ impl IndexWriterWrapperImpl {
|
|||
}
|
||||
|
||||
if !batch.is_empty() {
|
||||
writer.run(batch)?;
|
||||
self.index_writer.run(batch)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_strings_by_single_segment(&mut self, data: &[*const c_char]) -> Result<()> {
|
||||
let writer = self.index_writer.as_mut().right().unwrap();
|
||||
for key in data {
|
||||
let key = unsafe { CStr::from_ptr(*key) }
|
||||
.to_str()
|
||||
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
|
||||
writer.add_document(doc!(self.field => key))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn manual_merge(&mut self) -> Result<()> {
|
||||
let index_writer = self.index_writer.as_mut().left().unwrap();
|
||||
let metas = index_writer.index().searchable_segment_metas()?;
|
||||
let policy = index_writer.get_merge_policy();
|
||||
let metas = self.index_writer.index().searchable_segment_metas()?;
|
||||
let policy = self.index_writer.get_merge_policy();
|
||||
let candidates = policy.compute_merge_candidates(metas.as_slice());
|
||||
for candidate in candidates {
|
||||
index_writer.merge(candidate.0.as_slice()).wait()?;
|
||||
self.index_writer.merge(candidate.0.as_slice()).wait()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<()> {
|
||||
match self.index_writer {
|
||||
Either::Left(mut index_writer) => {
|
||||
index_writer.commit()?;
|
||||
// self.manual_merge();
|
||||
block_on(index_writer.garbage_collect_files())?;
|
||||
index_writer.wait_merging_threads()?;
|
||||
}
|
||||
Either::Right(single_segment_index_writer) => {
|
||||
single_segment_index_writer
|
||||
.finalize()
|
||||
.expect("failed to build inverted index");
|
||||
}
|
||||
}
|
||||
pub fn finish(mut self) -> Result<()> {
|
||||
self.index_writer.commit()?;
|
||||
// self.manual_merge();
|
||||
block_on(self.index_writer.garbage_collect_files())?;
|
||||
self.index_writer.wait_merging_threads()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn commit(&mut self) -> Result<()> {
|
||||
self.index_writer.as_mut().left().unwrap().commit()?;
|
||||
self.index_writer.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use either::Either;
|
||||
use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, FAST};
|
||||
use tantivy::Index;
|
||||
|
||||
|
@ -36,21 +35,20 @@ impl IndexWriterWrapperImpl {
|
|||
let tokenizer = create_analyzer(tokenizer_params)?;
|
||||
|
||||
let (schema, field, id_field) = build_text_schema(field_name, tokenizer_name);
|
||||
let index: Index;
|
||||
if in_ram {
|
||||
index = Index::create_in_ram(schema);
|
||||
let index = if in_ram {
|
||||
Index::create_in_ram(schema)
|
||||
} else {
|
||||
index = Index::create_in_dir(path.to_string(), schema).unwrap();
|
||||
}
|
||||
index.tokenizers().register(&tokenizer_name, tokenizer);
|
||||
Index::create_in_dir(path, schema).unwrap()
|
||||
};
|
||||
index.tokenizers().register(tokenizer_name, tokenizer);
|
||||
let index_writer = index
|
||||
.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)
|
||||
.unwrap();
|
||||
|
||||
Ok(IndexWriterWrapperImpl {
|
||||
field,
|
||||
index_writer: Either::Left(index_writer),
|
||||
id_field: Some(id_field),
|
||||
index_writer,
|
||||
id_field,
|
||||
index: Arc::new(index),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ use libc::{c_char, c_void};
|
|||
use tantivy::tokenizer::TextAnalyzer;
|
||||
|
||||
use crate::{
|
||||
analyzer::create_analyzer,
|
||||
array::RustResult,
|
||||
log::init_log,
|
||||
string_c::c_str_to_str,
|
||||
analyzer::create_analyzer,
|
||||
util::{create_binding, free_binding},
|
||||
};
|
||||
|
||||
|
@ -18,8 +18,7 @@ pub extern "C" fn tantivy_create_analyzer(analyzer_params: *const c_char) -> Rus
|
|||
Ok(text_analyzer) => RustResult::from_ptr(create_binding(text_analyzer)),
|
||||
Err(err) => RustResult::from_error(format!(
|
||||
"create tokenizer failed with error: {} param: {}",
|
||||
err.to_string(),
|
||||
params,
|
||||
err, params,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -89,8 +89,11 @@ struct TantivyIndexWrapper {
|
|||
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) {
|
||||
RustResultWrapper res;
|
||||
if (inverted_single_semgnent) {
|
||||
AssertInfo(tantivy_index_version == 5,
|
||||
"TantivyIndexWrapper: inverted_single_semgnent only "
|
||||
"support tantivy 5");
|
||||
res = RustResultWrapper(tantivy_create_index_with_single_segment(
|
||||
field_name, data_type, path, tantivy_index_version));
|
||||
field_name, data_type, path));
|
||||
} else {
|
||||
res = RustResultWrapper(
|
||||
tantivy_create_index(field_name,
|
||||
|
|
|
@ -94,7 +94,6 @@ set(MILVUS_TEST_FILES
|
|||
test_chunked_column.cpp
|
||||
test_rust_result.cpp
|
||||
test_cached_search_iterator.cpp
|
||||
test_build_inverted_index_with_single_segment.cpp
|
||||
test_random_sample.cpp
|
||||
test_json_index.cpp
|
||||
)
|
||||
|
|
|
@ -1,214 +0,0 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <random>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "pb/plan.pb.h"
|
||||
#include "segcore/SegmentSealedImpl.h"
|
||||
#include "index/InvertedIndexTantivy.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "common/Schema.h"
|
||||
#include "test_utils/GenExprProto.h"
|
||||
#include "query/PlanProto.h"
|
||||
#include "query/ExecPlanNodeVisitor.h"
|
||||
|
||||
using namespace milvus;
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
|
||||
template <typename T>
|
||||
SchemaPtr
|
||||
GenSchema() {
|
||||
auto schema_ = std::make_shared<Schema>();
|
||||
auto pk = schema_->AddDebugField("pk", DataType::INT64);
|
||||
schema_->set_primary_field_id(pk);
|
||||
|
||||
if constexpr (std::is_same_v<T, bool>) {
|
||||
schema_->AddDebugField("index", DataType::BOOL, false);
|
||||
} else if constexpr (std::is_same_v<T, int8_t>) {
|
||||
schema_->AddDebugField("index", DataType::INT8, false);
|
||||
} else if constexpr (std::is_same_v<T, int16_t>) {
|
||||
schema_->AddDebugField("index", DataType::INT16, false);
|
||||
} else if constexpr (std::is_same_v<T, int32_t>) {
|
||||
schema_->AddDebugField("index", DataType::INT32, false);
|
||||
} else if constexpr (std::is_same_v<T, int64_t>) {
|
||||
schema_->AddDebugField("index", DataType::INT64, false);
|
||||
} else if constexpr (std::is_same_v<T, float>) {
|
||||
schema_->AddDebugField("index", DataType::FLOAT, false);
|
||||
} else if constexpr (std::is_same_v<T, double>) {
|
||||
schema_->AddDebugField("index", DataType::DOUBLE, false);
|
||||
} else if constexpr (std::is_same_v<T, std::string>) {
|
||||
schema_->AddDebugField("index", DataType::VARCHAR, false);
|
||||
}
|
||||
|
||||
return schema_;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
class BuildInvertedIndexWithSingleSegmentTest : public ::testing::Test {
|
||||
public:
|
||||
void
|
||||
SetUp() override {
|
||||
schema_ = GenSchema<T>();
|
||||
seg_ = CreateSealedSegment(schema_);
|
||||
N_ = 3000;
|
||||
uint64_t seed = 1234;
|
||||
auto raw_data = DataGen(schema_, N_, seed);
|
||||
|
||||
if constexpr (std::is_same_v<T, bool>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.bool_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
} else if constexpr (std::is_same_v<T, int64_t>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.long_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
} else if constexpr (std::is_integral_v<T>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.int_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
} else if constexpr (std::is_same_v<T, float>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.float_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
} else if constexpr (std::is_same_v<T, double>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.double_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
} else if constexpr (std::is_same_v<T, std::string>) {
|
||||
auto index_col =
|
||||
raw_data.get_col(schema_->get_field_id(FieldName("index")))
|
||||
->scalars()
|
||||
.string_data()
|
||||
.data();
|
||||
for (size_t i = 0; i < N_; i++) {
|
||||
index_column_data_.push_back(index_col[i]);
|
||||
}
|
||||
}
|
||||
SealedLoadFieldData(raw_data, *seg_);
|
||||
LoadInvertedIndex();
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
}
|
||||
|
||||
void
|
||||
LoadInvertedIndex() {
|
||||
auto index = std::make_unique<index::InvertedIndexTantivy<T>>();
|
||||
Config cfg;
|
||||
cfg[milvus::index::SCALAR_INDEX_ENGINE_VERSION] = 0;
|
||||
index->BuildWithRawDataForUT(N_, index_column_data_.data(), cfg);
|
||||
LoadIndexInfo info{
|
||||
.field_id = schema_->get_field_id(FieldName("index")).get(),
|
||||
.index = std::move(index),
|
||||
};
|
||||
seg_->LoadIndex(info);
|
||||
}
|
||||
|
||||
T
|
||||
FieldValueAt(int64_t offset) {
|
||||
return index_column_data_[offset];
|
||||
}
|
||||
|
||||
public:
|
||||
SchemaPtr schema_;
|
||||
SegmentSealedUPtr seg_;
|
||||
int64_t N_;
|
||||
boost::container::vector<T> index_column_data_;
|
||||
};
|
||||
|
||||
TYPED_TEST_SUITE_P(BuildInvertedIndexWithSingleSegmentTest);
|
||||
|
||||
TYPED_TEST_P(BuildInvertedIndexWithSingleSegmentTest,
|
||||
ReadFromSingleSegmentIndex) {
|
||||
const auto& meta = this->schema_->operator[](FieldName("index"));
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
auto column_info = test::GenColumnInfo(
|
||||
meta.get_id().get(),
|
||||
static_cast<proto::schema::DataType>(meta.get_data_type()),
|
||||
false,
|
||||
false,
|
||||
static_cast<proto::schema::DataType>(meta.get_element_type()));
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
|
||||
std::uniform_int_distribution<> int_dist(1, this->N_);
|
||||
int random_idx = int_dist(gen) - 1;
|
||||
|
||||
auto unary_range_expr = std::make_unique<proto::plan::UnaryRangeExpr>();
|
||||
unary_range_expr->set_allocated_column_info(column_info);
|
||||
unary_range_expr->set_op(proto::plan::OpType::Equal);
|
||||
auto val = this->FieldValueAt(random_idx);
|
||||
unary_range_expr->set_allocated_value(test::GenGenericValue(val));
|
||||
|
||||
auto expr = test::GenExpr();
|
||||
expr->set_allocated_unary_range_expr(unary_range_expr.release());
|
||||
auto parser = ProtoParser(*this->schema_);
|
||||
auto typed_expr = parser.ParseExprs(*expr);
|
||||
auto parsed = std::make_shared<plan::FilterBitsNode>(
|
||||
DEFAULT_PLANNODE_ID, typed_expr);
|
||||
auto segpromote = dynamic_cast<SegmentSealedImpl*>(this->seg_.get());
|
||||
BitsetType final;
|
||||
final = ExecuteQueryExpr(parsed, segpromote, this->N_, MAX_TIMESTAMP);
|
||||
auto ref = [this, random_idx](size_t offset) -> bool {
|
||||
return this->index_column_data_[offset] ==
|
||||
this->index_column_data_[random_idx];
|
||||
};
|
||||
ASSERT_EQ(final.size(), this->N_);
|
||||
for (size_t i = 0; i < this->N_; i++) {
|
||||
if (std::is_floating_point_v<decltype(val)> && i == random_idx) {
|
||||
continue;
|
||||
}
|
||||
ASSERT_EQ(final[i], ref(i))
|
||||
<< "i: " << i << ", final[i]: " << final[i]
|
||||
<< ", ref(i): " << ref(i) << ", random_idx: " << random_idx
|
||||
<< ", value: " << this->index_column_data_[random_idx]
|
||||
<< ", value: " << this->index_column_data_[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
REGISTER_TYPED_TEST_CASE_P(BuildInvertedIndexWithSingleSegmentTest,
|
||||
ReadFromSingleSegmentIndex);
|
||||
|
||||
using ElementType = testing::
|
||||
Types<bool, int8_t, int16_t, int32_t, int64_t, float, double, std::string>;
|
||||
INSTANTIATE_TYPED_TEST_SUITE_P(Naive,
|
||||
BuildInvertedIndexWithSingleSegmentTest,
|
||||
ElementType);
|
Loading…
Reference in New Issue