Check pk filter merge error (#9952)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/9968/head
godchen 2021-10-15 16:58:42 +08:00 committed by GitHub
parent 356b7e36ba
commit 9e9ad7c7e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 4 deletions

View File

@ -310,7 +310,6 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
checkPoint: *cp,
endPos: &cp.pos,
//TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value
@ -331,7 +330,10 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
return err
}
for _, stat := range stats {
seg.pkFilter.Merge(stat.BF)
err = seg.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
if seg.minPK > stat.Min {
seg.minPK = stat.Min
}
@ -396,7 +398,10 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
return err
}
for _, stat := range stats {
seg.pkFilter.Merge(stat.BF)
err = seg.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
if seg.minPK > stat.Min {
seg.minPK = stat.Min
}

View File

@ -52,6 +52,21 @@ func (kv *mockMinioKV) LoadWithPrefix(prefix string) ([]string, []string, error)
return []string{"0"}, []string{string(buffer)}, nil
}
type mockPkfilterMergeError struct {
kv.BaseKV
}
func (kv *mockPkfilterMergeError) LoadWithPrefix(prefix string) ([]string, []string, error) {
stats := &storage.Int64Stats{
FieldID: common.RowIDField,
Min: 0,
Max: 10,
BF: bloom.NewWithEstimates(1, 0.0001),
}
buffer, _ := json.Marshal(stats)
return []string{"0"}, []string{string(buffer)}, nil
}
type mockMinioKVError struct {
kv.BaseKV
}
@ -513,7 +528,7 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
assert.NotNil(to, err)
})
te.Run("Test_addNormalSegmentStatsError", func(to *testing.T) {
te.Run("Test_addSegmentStatsError", func(to *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
assert.Nil(to, err)
sr.minIOKV = &mockMinioKVStatsError{}
@ -526,6 +541,19 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
assert.NotNil(to, err)
})
te.Run("Test_addSegmentPkfilterError", func(to *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
assert.Nil(to, err)
sr.minIOKV = &mockPkfilterMergeError{}
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
assert.NotNil(to, err)
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0))
assert.NotNil(to, err)
})
te.Run("Test inner function segment", func(t *testing.T) {
collID := UniqueID(1)
replica, err := newReplica(context.Background(), rc, collID)