enhance: Allow empty sparse row (#34700)

issue: #29419

* If a sparse vector with 0 non-zero value is inserted, no ANN search on
this sparse vector field will return it as a result. User may retrieve
this row via scalar query or ANN search on another vector field though.
* If the user uses an empty sparse vector as the query vector for a ANN
search, no neighbor will be returned.

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
pull/35517/head
Buqian Zheng 2024-08-16 14:14:54 +08:00 committed by GitHub
parent 1d49358f82
commit f4a91e135b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 145 additions and 37 deletions

View File

@ -224,7 +224,6 @@ CopyAndWrapSparseRow(const void* data,
knowhere::sparse::SparseRow<float> row(num_elements); knowhere::sparse::SparseRow<float> row(num_elements);
std::memcpy(row.data(), data, size); std::memcpy(row.data(), data, size);
if (validate) { if (validate) {
AssertInfo(size > 0, "Sparse row data should not be empty");
AssertInfo( AssertInfo(
size % knowhere::sparse::SparseRow<float>::element_size() == 0, size % knowhere::sparse::SparseRow<float>::element_size() == 0,
"Invalid size for sparse row data"); "Invalid size for sparse row data");

View File

@ -282,14 +282,14 @@ class IndexingRecord {
//Small-Index enabled, create index for vector field only //Small-Index enabled, create index for vector field only
if (index_meta_->GetIndexMaxRowCount() > 0 && if (index_meta_->GetIndexMaxRowCount() > 0 &&
index_meta_->HasFiled(field_id)) { index_meta_->HasFiled(field_id)) {
auto vec_filed_meta = auto vec_field_meta =
index_meta_->GetFieldIndexMeta(field_id); index_meta_->GetFieldIndexMeta(field_id);
//Disable growing index for flat //Disable growing index for flat
if (!vec_filed_meta.IsFlatIndex()) { if (!vec_field_meta.IsFlatIndex()) {
field_indexings_.try_emplace( field_indexings_.try_emplace(
field_id, field_id,
CreateIndex(field_meta, CreateIndex(field_meta,
vec_filed_meta, vec_field_meta,
index_meta_->GetIndexMaxRowCount(), index_meta_->GetIndexMaxRowCount(),
segcore_config_)); segcore_config_));
} }

View File

@ -671,6 +671,76 @@ TEST_P(IndexTest, GetVector) {
} }
} }
// This ut runs for sparse only. And will not use the default xb_sparse_dataset.
TEST_P(IndexTest, GetVector_EmptySparseVector) {
if (index_type != knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX &&
index_type != knowhere::IndexEnum::INDEX_SPARSE_WAND) {
return;
}
NB = 3;
std::vector<knowhere::sparse::SparseRow<float>> vec;
vec.reserve(NB);
vec.emplace_back(2);
vec[0].set_at(0, 1, 1.0);
vec[0].set_at(1, 2, 2.0);
// row1 is an explicit empty row
vec.emplace_back(0);
// row2 is an implicit empty row(provided dim has a value of 0)
vec.emplace_back(1);
vec[2].set_at(0, 1, 0);
auto dataset = knowhere::GenDataSet(NB, 3, vec.data());
milvus::index::CreateIndexInfo create_index_info;
create_index_info.index_type = index_type;
create_index_info.metric_type = metric_type;
create_index_info.field_type = vec_field_data_type;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
index::IndexBasePtr index;
milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100};
milvus::storage::IndexMeta index_meta{3, 100, 1000, 1};
auto chunk_manager = milvus::storage::CreateChunkManager(storage_config_);
milvus::storage::FileManagerContext file_manager_context(
field_data_meta, index_meta, chunk_manager);
index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager_context);
// use custom dataset instead of xb_dataset
ASSERT_NO_THROW(index->BuildWithDataset(dataset, build_conf));
milvus::index::IndexBasePtr new_index;
milvus::index::VectorIndex* vec_index = nullptr;
auto binary_set = index->Upload();
index.reset();
std::vector<std::string> index_files;
for (auto& binary : binary_set.binary_map_) {
index_files.emplace_back(binary.first);
}
new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager_context);
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
auto ids_ds = GenRandomIds(NB);
auto sparse_rows = vec_index->GetSparseVector(ids_ds);
for (size_t i = 0; i < NB; ++i) {
auto id = ids_ds->GetIds()[i];
auto& row = sparse_rows[i];
ASSERT_EQ(row.size(), vec[id].size());
for (size_t j = 0; j < row.size(); ++j) {
ASSERT_EQ(row[j].id, vec[id][j].id);
ASSERT_EQ(row[j].val, vec[id][j].val);
}
}
}
#ifdef BUILD_DISK_ANN #ifdef BUILD_DISK_ANN
TEST(Indexing, SearchDiskAnnWithInvalidParam) { TEST(Indexing, SearchDiskAnnWithInvalidParam) {
int64_t NB = 1000; int64_t NB = 1000;

View File

@ -1557,8 +1557,8 @@ func trimSparseFloatArray(vec *schemapb.SparseFloatArray) {
func ValidateSparseFloatRows(rows ...[]byte) error { func ValidateSparseFloatRows(rows ...[]byte) error {
for _, row := range rows { for _, row := range rows {
if len(row) == 0 { if row == nil {
return errors.New("empty sparse float vector row") return errors.New("nil sparse float vector")
} }
if len(row)%8 != 0 { if len(row)%8 != 0 {
return fmt.Errorf("invalid data length in sparse float vector: %d", len(row)) return fmt.Errorf("invalid data length in sparse float vector: %d", len(row))
@ -1647,7 +1647,8 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
var values []float32 var values []float32
if len(input) == 0 { if len(input) == 0 {
return nil, fmt.Errorf("empty JSON input") // for empty json input, return empty sparse row
return CreateSparseFloatRow(indices, values), nil
} }
getValue := func(key interface{}) (float32, error) { getValue := func(key interface{}) (float32, error) {
@ -1743,9 +1744,6 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
if len(indices) != len(values) { if len(indices) != len(values) {
return nil, fmt.Errorf("indices and values length mismatch") return nil, fmt.Errorf("indices and values length mismatch")
} }
if len(indices) == 0 {
return nil, fmt.Errorf("empty indices/values in JSON input")
}
sortedIndices, sortedValues := SortSparseFloatRow(indices, values) sortedIndices, sortedValues := SortSparseFloatRow(indices, values)
row := CreateSparseFloatRow(sortedIndices, sortedValues) row := CreateSparseFloatRow(sortedIndices, sortedValues)
@ -1766,7 +1764,8 @@ func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
return CreateSparseFloatRowFromMap(vec) return CreateSparseFloatRowFromMap(vec)
} }
// dim of a sparse float vector is the maximum/last index + 1 // dim of a sparse float vector is the maximum/last index + 1.
// for an empty row, dim is 0.
func SparseFloatRowDim(row []byte) int64 { func SparseFloatRowDim(row []byte) int64 {
if len(row) == 0 { if len(row) == 0 {
return 0 return 0

View File

@ -939,7 +939,7 @@ func TestAppendFieldData(t *testing.T) {
SparseFloatVector := &schemapb.SparseFloatArray{ SparseFloatVector := &schemapb.SparseFloatArray{
Dim: 231, Dim: 231,
Contents: [][]byte{ Contents: [][]byte{
CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), CreateSparseFloatRow([]uint32{}, []float32{}),
CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}),
}, },
} }
@ -1515,7 +1515,7 @@ func TestGetDataAndGetDataSize(t *testing.T) {
SparseFloatVector := &schemapb.SparseFloatArray{ SparseFloatVector := &schemapb.SparseFloatArray{
Dim: 231, Dim: 231,
Contents: [][]byte{ Contents: [][]byte{
CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), CreateSparseFloatRow([]uint32{}, []float32{}),
CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}),
}, },
} }
@ -1587,7 +1587,7 @@ func TestMergeFieldData(t *testing.T) {
// 3 rows for src // 3 rows for src
CreateSparseFloatRow([]uint32{600, 800, 2300}, []float32{2.1, 2.2, 2.3}), CreateSparseFloatRow([]uint32{600, 800, 2300}, []float32{2.1, 2.2, 2.3}),
CreateSparseFloatRow([]uint32{90, 141, 352}, []float32{1.1, 1.2, 1.3}), CreateSparseFloatRow([]uint32{90, 141, 352}, []float32{1.1, 1.2, 1.3}),
CreateSparseFloatRow([]uint32{160, 280, 340}, []float32{2.1, 2.2, 2.3}), CreateSparseFloatRow([]uint32{}, []float32{}),
} }
t.Run("merge data", func(t *testing.T) { t.Run("merge data", func(t *testing.T) {
@ -2187,6 +2187,10 @@ func TestValidateSparseFloatRows(t *testing.T) {
CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}),
CreateSparseFloatRow([]uint32{2, 4, 6}, []float32{4.0, 5.0, 6.0}), CreateSparseFloatRow([]uint32{2, 4, 6}, []float32{4.0, 5.0, 6.0}),
CreateSparseFloatRow([]uint32{0, 7, 8}, []float32{7.0, 8.0, 9.0}), CreateSparseFloatRow([]uint32{0, 7, 8}, []float32{7.0, 8.0, 9.0}),
// we allow empty row(including indices with 0 value)
CreateSparseFloatRow([]uint32{}, []float32{}),
CreateSparseFloatRow([]uint32{24}, []float32{0}),
{},
} }
err := ValidateSparseFloatRows(rows...) err := ValidateSparseFloatRows(rows...)
assert.NoError(t, err) assert.NoError(t, err)
@ -2257,14 +2261,6 @@ func TestValidateSparseFloatRows(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("empty indices or values", func(t *testing.T) {
rows := [][]byte{
CreateSparseFloatRow([]uint32{}, []float32{}),
}
err := ValidateSparseFloatRows(rows...)
assert.Error(t, err)
})
t.Run("no rows", func(t *testing.T) { t.Run("no rows", func(t *testing.T) {
err := ValidateSparseFloatRows() err := ValidateSparseFloatRows()
assert.NoError(t, err) assert.NoError(t, err)
@ -2300,6 +2296,20 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res) assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res)
}) })
t.Run("valid row 5", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{}, []float32{}), res)
})
t.Run("valid row 6", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1}, "values": []interface{}{0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1}, []float32{0}), res)
})
t.Run("invalid row 1", func(t *testing.T) { t.Run("invalid row 1", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}} row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}}
_, err := CreateSparseFloatRowFromMap(row) _, err := CreateSparseFloatRowFromMap(row)
@ -2312,12 +2322,6 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("invalid row 3", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 4", func(t *testing.T) { t.Run("invalid row 4", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}} row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}}
_, err := CreateSparseFloatRowFromMap(row) _, err := CreateSparseFloatRowFromMap(row)
@ -2368,6 +2372,20 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
}) })
t.Run("valid dict row 3", func(t *testing.T) {
row := map[string]interface{}{}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{}, []float32{}), res)
})
t.Run("valid dict row 4", func(t *testing.T) {
row := map[string]interface{}{"1": 0}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1}, []float32{0}), res)
})
t.Run("invalid dict row 1", func(t *testing.T) { t.Run("invalid dict row 1", func(t *testing.T) {
row := map[string]interface{}{"a": 1.0, "3": 2.0, "5": 3.0} row := map[string]interface{}{"a": 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row) _, err := CreateSparseFloatRowFromMap(row)
@ -2398,12 +2416,6 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("invalid dict row 6", func(t *testing.T) {
row := map[string]interface{}{}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 7", func(t *testing.T) { t.Run("invalid dict row 7", func(t *testing.T) {
row := map[string]interface{}{fmt.Sprint(math.MaxUint32): 1.0, "3": 2.0, "5": 3.0} row := map[string]interface{}{fmt.Sprint(math.MaxUint32): 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row) _, err := CreateSparseFloatRowFromMap(row)
@ -2452,6 +2464,20 @@ func TestParseJsonSparseFloatRowBytes(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res) assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res)
}) })
t.Run("valid row 4", func(t *testing.T) {
row := []byte(`{"indices":[], "values":[]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{}, []float32{}), res)
})
t.Run("valid row 5", func(t *testing.T) {
row := []byte(`{"indices":[1], "values":[0]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1}, []float32{0}), res)
})
t.Run("invalid row 1", func(t *testing.T) { t.Run("invalid row 1", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`) row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`)
_, err := CreateSparseFloatRowFromJSON(row) _, err := CreateSparseFloatRowFromJSON(row)
@ -2508,6 +2534,20 @@ func TestParseJsonSparseFloatRowBytes(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
}) })
t.Run("valid dict row 3", func(t *testing.T) {
row := []byte(`{}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{}, []float32{}), res)
})
t.Run("valid dict row 4", func(t *testing.T) {
row := []byte(`{"1": 0}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1}, []float32{0}), res)
})
t.Run("invalid dict row 1", func(t *testing.T) { t.Run("invalid dict row 1", func(t *testing.T) {
row := []byte(`{"a": 1.0, "3": 2.0, "5": 3.0}`) row := []byte(`{"a": 1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row) _, err := CreateSparseFloatRowFromJSON(row)
@ -2545,7 +2585,7 @@ func TestParseJsonSparseFloatRowBytes(t *testing.T) {
}) })
t.Run("invalid dict row 7", func(t *testing.T) { t.Run("invalid dict row 7", func(t *testing.T) {
row := []byte(`{}`) row := []byte(``)
_, err := CreateSparseFloatRowFromJSON(row) _, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err) assert.Error(t, err)
}) })

View File

@ -183,7 +183,7 @@ func (s *SparseTestSuite) TestSparse_invalid_insert() {
s.NotEqual(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) s.NotEqual(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
sparseVecs.Contents[0] = sparseVecs.Contents[0][:len(sparseVecs.Contents[0])-4] sparseVecs.Contents[0] = sparseVecs.Contents[0][:len(sparseVecs.Contents[0])-4]
// empty row is not allowed // empty row is allowed
sparseVecs.Contents[0] = []byte{} sparseVecs.Contents[0] = []byte{}
insertResult, err = c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ insertResult, err = c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName, DbName: dbName,
@ -193,7 +193,7 @@ func (s *SparseTestSuite) TestSparse_invalid_insert() {
NumRows: uint32(rowNum), NumRows: uint32(rowNum),
}) })
s.NoError(err) s.NoError(err)
s.NotEqual(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// unsorted column index is not allowed // unsorted column index is not allowed
sparseVecs.Contents[0] = make([]byte, 16) sparseVecs.Contents[0] = make([]byte, 16)