mirror of https://github.com/milvus-io/milvus.git
Init segment replica bloom filter (#9770)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/9854/head
parent
964c8ff414
commit
a2dc0d8808
|
@ -300,7 +300,10 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
replica := newReplica(node.rootCoord, vchan.CollectionID)
|
||||
replica, err := newReplica(node.ctx, node.rootCoord, vchan.CollectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var alloc allocatorInterface = newAllocator(node.rootCoord)
|
||||
|
||||
|
|
|
@ -125,7 +125,8 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
|
|||
te.Run(test.description, func(t *testing.T) {
|
||||
df := &DataCoordFactory{}
|
||||
|
||||
replica := newReplica(&RootCoordFactory{}, test.collID)
|
||||
replica, err := newReplica(context.Background(), &RootCoordFactory{}, test.collID)
|
||||
assert.Nil(t, err)
|
||||
if test.replicaNil {
|
||||
replica = nil
|
||||
}
|
||||
|
@ -197,7 +198,8 @@ func TestDataSyncService_Start(t *testing.T) {
|
|||
collectionID := UniqueID(1)
|
||||
|
||||
flushChan := &flushChans{make(chan *flushMsg, 100), make(chan *flushMsg, 100)}
|
||||
replica := newReplica(mockRootCoord, collectionID)
|
||||
replica, err := newReplica(context.Background(), mockRootCoord, collectionID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
allocFactory := NewAllocatorFactory(1)
|
||||
msFactory := msgstream.NewPmsFactory()
|
||||
|
@ -205,7 +207,7 @@ func TestDataSyncService_Start(t *testing.T) {
|
|||
"pulsarAddress": pulsarURL,
|
||||
"receiveBufSize": 1024,
|
||||
"pulsarBufSize": 1024}
|
||||
err := msFactory.SetParams(m)
|
||||
err = msFactory.SetParams(m)
|
||||
assert.Nil(t, err)
|
||||
|
||||
insertChannelName := "data_sync_service_test_dml"
|
||||
|
|
|
@ -65,7 +65,8 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
|
|||
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
|
||||
mockRootCoord := &RootCoordFactory{}
|
||||
|
||||
replica := newReplica(mockRootCoord, collMeta.ID)
|
||||
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
require.NoError(t, err)
|
||||
|
@ -160,7 +161,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
|||
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
|
||||
mockRootCoord := &RootCoordFactory{}
|
||||
|
||||
replica := newReplica(mockRootCoord, collMeta.ID)
|
||||
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
require.NoError(t, err)
|
||||
|
@ -218,9 +220,10 @@ func TestFlushSegment(t *testing.T) {
|
|||
flushMap := sync.Map{}
|
||||
mockRootCoord := &RootCoordFactory{}
|
||||
|
||||
replica := newReplica(mockRootCoord, collMeta.ID)
|
||||
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err := replica.addNewSegment(segmentID, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
err = replica.addNewSegment(segmentID, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
require.NoError(t, err)
|
||||
replica.updateSegmentEndPosition(segmentID, &internalpb.MsgPosition{ChannelName: "TestChannel"})
|
||||
|
||||
|
@ -593,7 +596,8 @@ func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
|
|||
compactTs: 100,
|
||||
}
|
||||
|
||||
replica := newReplica(mockRootCoord, collMeta.ID)
|
||||
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
require.NoError(t, err)
|
||||
|
@ -652,7 +656,8 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
|
|||
compactTs: 100,
|
||||
}
|
||||
|
||||
replica := newReplica(mockRootCoord, collMeta.ID)
|
||||
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
|
||||
require.NoError(t, err)
|
||||
|
@ -713,8 +718,11 @@ func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) {
|
|||
}
|
||||
|
||||
for _, test := range invalideTests {
|
||||
replica, err := newReplica(context.Background(), &RootCoordFactory{}, test.replicaCollID)
|
||||
assert.Nil(te, err)
|
||||
|
||||
ibNode := &insertBufferNode{
|
||||
replica: newReplica(&RootCoordFactory{}, test.replicaCollID),
|
||||
replica: replica,
|
||||
}
|
||||
|
||||
im := []*msgstream.InsertMsg{
|
||||
|
|
|
@ -16,16 +16,21 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/bits-and-blooms/bloom/v3"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
)
|
||||
|
||||
|
@ -90,6 +95,7 @@ type SegmentReplica struct {
|
|||
flushedSegments map[UniqueID]*Segment
|
||||
|
||||
metaService *metaService
|
||||
minIOKV kv.BaseKV
|
||||
}
|
||||
|
||||
func (s *Segment) updatePKRange(rowIDs []int64) {
|
||||
|
@ -108,10 +114,25 @@ func (s *Segment) updatePKRange(rowIDs []int64) {
|
|||
|
||||
var _ Replica = &SegmentReplica{}
|
||||
|
||||
func newReplica(rc types.RootCoord, collID UniqueID) Replica {
|
||||
func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*SegmentReplica, error) {
|
||||
// MinIO
|
||||
option := &miniokv.Option{
|
||||
Address: Params.MinioAddress,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSL,
|
||||
CreateBucket: true,
|
||||
BucketName: Params.MinioBucketName,
|
||||
}
|
||||
|
||||
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metaService := newMetaService(rc, collID)
|
||||
|
||||
var replica Replica = &SegmentReplica{
|
||||
replica := &SegmentReplica{
|
||||
collectionID: collID,
|
||||
|
||||
newSegments: make(map[UniqueID]*Segment),
|
||||
|
@ -119,8 +140,10 @@ func newReplica(rc types.RootCoord, collID UniqueID) Replica {
|
|||
flushedSegments: make(map[UniqueID]*Segment),
|
||||
|
||||
metaService: metaService,
|
||||
minIOKV: minIOKV,
|
||||
}
|
||||
return replica
|
||||
|
||||
return replica, nil
|
||||
}
|
||||
|
||||
// segmentFlushed transfers a segment from *New* or *Normal* into *Flushed*.
|
||||
|
@ -293,6 +316,31 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
|
|||
maxPK: math.MinInt64, // use min value represents no value
|
||||
}
|
||||
|
||||
p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField))
|
||||
keys, values, err := replica.minIOKV.LoadWithPrefix(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blobs := make([]*Blob, 0)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
blobs = append(blobs, &Blob{Key: keys[i], Value: []byte(values[i])})
|
||||
}
|
||||
|
||||
stats, err := storage.DeserializeStats(blobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, stat := range stats {
|
||||
seg.pkFilter.Merge(stat.BF)
|
||||
if seg.minPK > stat.Min {
|
||||
seg.minPK = stat.Min
|
||||
}
|
||||
|
||||
if seg.maxPK < stat.Max {
|
||||
seg.maxPK = stat.Max
|
||||
}
|
||||
}
|
||||
|
||||
seg.isNew.Store(false)
|
||||
seg.isFlushed.Store(false)
|
||||
|
||||
|
@ -333,6 +381,31 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
|
|||
maxPK: math.MinInt64, // use min value represents no value
|
||||
}
|
||||
|
||||
p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField))
|
||||
keys, values, err := replica.minIOKV.LoadWithPrefix(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blobs := make([]*Blob, 0)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
blobs = append(blobs, &Blob{Key: keys[i], Value: []byte(values[i])})
|
||||
}
|
||||
|
||||
stats, err := storage.DeserializeStats(blobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, stat := range stats {
|
||||
seg.pkFilter.Merge(stat.BF)
|
||||
if seg.minPK > stat.Min {
|
||||
seg.minPK = stat.Min
|
||||
}
|
||||
|
||||
if seg.maxPK < stat.Max {
|
||||
seg.maxPK = stat.Max
|
||||
}
|
||||
}
|
||||
|
||||
seg.isNew.Store(false)
|
||||
seg.isFlushed.Store(true)
|
||||
|
||||
|
|
|
@ -12,7 +12,10 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
@ -21,31 +24,50 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
)
|
||||
|
||||
func newSegmentReplica(rc types.RootCoord, collID UniqueID) *SegmentReplica {
|
||||
metaService := newMetaService(rc, collID)
|
||||
|
||||
var replica = &SegmentReplica{
|
||||
collectionID: collID,
|
||||
|
||||
newSegments: make(map[UniqueID]*Segment),
|
||||
normalSegments: make(map[UniqueID]*Segment),
|
||||
flushedSegments: make(map[UniqueID]*Segment),
|
||||
|
||||
metaService: metaService,
|
||||
}
|
||||
return replica
|
||||
}
|
||||
|
||||
func TestNewReplica(t *testing.T) {
|
||||
rc := &RootCoordFactory{}
|
||||
replica := newReplica(rc, 0)
|
||||
replica, err := newReplica(context.Background(), rc, 0)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, replica)
|
||||
}
|
||||
|
||||
type mockMinioKV struct {
|
||||
kv.BaseKV
|
||||
}
|
||||
|
||||
func (kv *mockMinioKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
stats := &storage.Int64Stats{
|
||||
FieldID: common.RowIDField,
|
||||
Min: 0,
|
||||
Max: 10,
|
||||
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
|
||||
}
|
||||
buffer, _ := json.Marshal(stats)
|
||||
return []string{"0"}, []string{string(buffer)}, nil
|
||||
}
|
||||
|
||||
type mockMinioKVError struct {
|
||||
kv.BaseKV
|
||||
}
|
||||
|
||||
func (kv *mockMinioKVError) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
return nil, nil, fmt.Errorf("mock error")
|
||||
}
|
||||
|
||||
type mockMinioKVStatsError struct {
|
||||
kv.BaseKV
|
||||
}
|
||||
|
||||
func (kv *mockMinioKVStatsError) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
return []string{"0"}, []string{"3123123,error,test"}, nil
|
||||
}
|
||||
|
||||
func TestSegmentReplica_getCollectionAndPartitionID(te *testing.T) {
|
||||
tests := []struct {
|
||||
segInNew UniqueID
|
||||
|
@ -122,9 +144,10 @@ func TestSegmentReplica(t *testing.T) {
|
|||
collID := UniqueID(1)
|
||||
|
||||
t.Run("Test coll mot match", func(t *testing.T) {
|
||||
replica := newSegmentReplica(rc, collID)
|
||||
replica, err := newReplica(context.Background(), rc, collID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err := replica.addNewSegment(1, collID+1, 0, "", nil, nil)
|
||||
err = replica.addNewSegment(1, collID+1, 0, "", nil, nil)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
|
@ -220,9 +243,10 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
to.Run(test.description, func(t *testing.T) {
|
||||
sr := newSegmentReplica(rc, test.replicaCollID)
|
||||
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
|
||||
assert.Nil(t, err)
|
||||
require.False(t, sr.hasSegment(test.inSegID, true))
|
||||
err := sr.addNewSegment(test.inSegID,
|
||||
err = sr.addNewSegment(test.inSegID,
|
||||
test.inCollID, 1, "", test.instartPos, &internalpb.MsgPosition{})
|
||||
if test.isValidCase {
|
||||
assert.NoError(t, err)
|
||||
|
@ -255,9 +279,11 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
to.Run(test.description, func(t *testing.T) {
|
||||
sr := newSegmentReplica(rc, test.replicaCollID)
|
||||
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
|
||||
sr.minIOKV = &mockMinioKV{}
|
||||
assert.Nil(t, err)
|
||||
require.False(t, sr.hasSegment(test.inSegID, true))
|
||||
err := sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, &segmentCheckPoint{})
|
||||
err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, &segmentCheckPoint{})
|
||||
if test.isValidCase {
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, sr.hasSegment(test.inSegID, true))
|
||||
|
@ -452,7 +478,8 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
to.Run(test.description, func(t *testing.T) {
|
||||
sr := newSegmentReplica(rc, test.replicaCollID)
|
||||
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if test.metaServiceErr {
|
||||
rc.setCollectionID(-1)
|
||||
|
@ -473,15 +500,43 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
|
|||
|
||||
})
|
||||
|
||||
te.Run("Test_addSegmentMinIOLoadError", func(to *testing.T) {
|
||||
sr, err := newReplica(context.Background(), rc, 1)
|
||||
assert.Nil(to, err)
|
||||
sr.minIOKV = &mockMinioKVError{}
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
|
||||
assert.NotNil(to, err)
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0))
|
||||
assert.NotNil(to, err)
|
||||
})
|
||||
|
||||
te.Run("Test_addNormalSegmentStatsError", func(to *testing.T) {
|
||||
sr, err := newReplica(context.Background(), rc, 1)
|
||||
assert.Nil(to, err)
|
||||
sr.minIOKV = &mockMinioKVStatsError{}
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
|
||||
assert.NotNil(to, err)
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0))
|
||||
assert.NotNil(to, err)
|
||||
})
|
||||
|
||||
te.Run("Test inner function segment", func(t *testing.T) {
|
||||
collID := UniqueID(1)
|
||||
replica := newSegmentReplica(rc, collID)
|
||||
replica, err := newReplica(context.Background(), rc, collID)
|
||||
assert.Nil(t, err)
|
||||
replica.minIOKV = &mockMinioKV{}
|
||||
assert.False(t, replica.hasSegment(0, true))
|
||||
assert.False(t, replica.hasSegment(0, false))
|
||||
|
||||
startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)}
|
||||
endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)}
|
||||
err := replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos)
|
||||
err = replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, replica.hasSegment(0, true))
|
||||
assert.Equal(t, 1, len(replica.newSegments))
|
||||
|
@ -599,9 +654,11 @@ func TestReplica_UpdatePKRange(t *testing.T) {
|
|||
cpPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
|
||||
replica := newSegmentReplica(rc, collID)
|
||||
replica, err := newReplica(context.Background(), rc, collID)
|
||||
assert.Nil(t, err)
|
||||
replica.minIOKV = &mockMinioKV{}
|
||||
|
||||
err := replica.addNewSegment(1, collID, partID, chanName, startPos, endPos)
|
||||
err = replica.addNewSegment(1, collID, partID, chanName, startPos, endPos)
|
||||
assert.Nil(t, err)
|
||||
err = replica.addNormalSegment(2, collID, partID, chanName, 100, cp)
|
||||
assert.Nil(t, err)
|
||||
|
|
Loading…
Reference in New Issue