Add query node segment filter (#7303)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/7346/head
godchen 2021-08-28 10:12:00 +08:00 committed by GitHub
parent 2dc2cb1a28
commit c333af0dcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 29 deletions

View File

@ -13,6 +13,8 @@ package datanode
import (
"context"
"encoding/binary"
"errors"
"go.uber.org/zap"
@ -55,6 +57,27 @@ func (ddn *deleteNode) Operate(in []Msg) []Msg {
return []Msg{}
}
func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, error) {
if pks == nil {
return nil, errors.New("pks is nil when getSegmentsByPKs")
}
if segments == nil {
return nil, errors.New("segments is nil when getSegmentsByPKs")
}
results := make(map[int64][]int64)
buf := make([]byte, 8)
for _, segment := range segments {
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
if exist {
results[segment.segmentID] = append(results[segment.segmentID], pk)
}
}
}
return results, nil
}
func newDeleteDNode(ctx context.Context, replica Replica) *deleteNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)

View File

@ -13,8 +13,10 @@ package datanode
import (
"context"
"encoding/binary"
"testing"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
)
@ -35,3 +37,53 @@ func TestFlowGraphDeleteNode_Operate_Invalid_Size(t *testing.T) {
result := deleteNode.Operate([]Msg{Msg1, Msg2})
assert.Equal(t, len(result), 0)
}
func TestGetSegmentsByPKs(t *testing.T) {
buf := make([]byte, 8)
filter1 := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter1.Add(buf)
}
filter2 := bloom.NewWithEstimates(1000000, 0.01)
for i := 3; i < 5; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter2.Add(buf)
}
segment1 := &Segment{
segmentID: 1,
pkFilter: filter1,
}
segment2 := &Segment{
segmentID: 2,
pkFilter: filter1,
}
segment3 := &Segment{
segmentID: 3,
pkFilter: filter1,
}
segment4 := &Segment{
segmentID: 4,
pkFilter: filter2,
}
segment5 := &Segment{
segmentID: 5,
pkFilter: filter2,
}
segments := []*Segment{segment1, segment2, segment3, segment4, segment5}
results, err := getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segments)
assert.Nil(t, err)
expected := map[int64][]int64{
1: {0, 1, 2},
2: {0, 1, 2},
3: {0, 1, 2},
4: {3, 4},
5: {3, 4},
}
assert.Equal(t, expected, results)
_, err = getSegmentsByPKs(nil, segments)
assert.NotNil(t, err)
_, err = getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil)
assert.NotNil(t, err)
}

View File

@ -1136,6 +1136,27 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
return nil
}
func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, error) {
if pks == nil {
return nil, fmt.Errorf("pks is nil when getSegmentsByPKs")
}
if segments == nil {
return nil, fmt.Errorf("segments is nil when getSegmentsByPKs")
}
results := make(map[int64][]int64)
buf := make([]byte, 8)
for _, segment := range segments {
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
if exist {
results[segment.segmentID] = append(results[segment.segmentID], pk)
}
}
}
return results, nil
}
func mergeRetrieveResults(dataArr []*segcorepb.RetrieveResults) (*segcorepb.RetrieveResults, error) {
var final *segcorepb.RetrieveResults
for _, data := range dataArr {

View File

@ -7,6 +7,7 @@ import (
"math/rand"
"testing"
"github.com/bits-and-blooms/bloom/v3"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
@ -128,3 +129,53 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
historical.close()
streaming.close()
}
func TestGetSegmentsByPKs(t *testing.T) {
buf := make([]byte, 8)
filter1 := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter1.Add(buf)
}
filter2 := bloom.NewWithEstimates(1000000, 0.01)
for i := 3; i < 5; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
filter2.Add(buf)
}
segment1 := &Segment{
segmentID: 1,
pkFilter: filter1,
}
segment2 := &Segment{
segmentID: 2,
pkFilter: filter1,
}
segment3 := &Segment{
segmentID: 3,
pkFilter: filter1,
}
segment4 := &Segment{
segmentID: 4,
pkFilter: filter2,
}
segment5 := &Segment{
segmentID: 5,
pkFilter: filter2,
}
segments := []*Segment{segment1, segment2, segment3, segment4, segment5}
results, err := getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segments)
assert.Nil(t, err)
expected := map[int64][]int64{
1: {0, 1, 2},
2: {0, 1, 2},
3: {0, 1, 2},
4: {3, 4},
5: {3, 4},
}
assert.Equal(t, expected, results)
_, err = getSegmentsByPKs(nil, segments)
assert.NotNil(t, err)
_, err = getSegmentsByPKs([]int64{0, 1, 2, 3, 4}, nil)
assert.NotNil(t, err)
}

View File

@ -29,6 +29,7 @@ import (
"sync"
"unsafe"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
@ -90,6 +91,8 @@ type Segment struct {
vectorFieldMutex sync.RWMutex // guards vectorFieldInfos
vectorFieldInfos map[UniqueID]*VectorFieldInfo
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
}
//-------------------------------------------------------------------------------------- common interfaces

View File

@ -1,29 +0,0 @@
package segmentfilter
import (
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
// SegmentFilter is used to know which segments may have data corresponding
// to the primary key
type SegmentFilter struct {
segmentInfos []*datapb.SegmentInfo
bloomFilters []*bloom.BloomFilter
}
func NewSegmentFilter(segmentInfos []*datapb.SegmentInfo) *SegmentFilter {
return &SegmentFilter{
segmentInfos: segmentInfos,
}
}
func (sf *SegmentFilter) init() {
panic("This method has not been implemented")
}
// GetSegmentByPK pass a list of primary key and retrun an map of
// <segmentID, []string{primary_key}>
func (sf *SegmentFilter) GetSegmentByPK(pk []string) map[int64][]string {
panic("This method has not been implemented")
}