Refine bloomfilter and memory usage (#20168)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/20211/head
Xiaofan 2022-10-31 17:41:34 +08:00 committed by GitHub
parent c6151ad351
commit 2bfecf5b4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 850 additions and 676 deletions

View File

@ -422,10 +422,17 @@ func (m *meta) UpdateFlushSegmentsInfo(
}
}
clonedSegment.Binlogs = currBinlogs
// statlogs, overwrite latest segment stats log
if len(statslogs) > 0 {
clonedSegment.Statslogs = statslogs
// statlogs
currStatsLogs := clonedSegment.GetStatslogs()
for _, tStatsLogs := range statslogs {
fieldStatsLog := getFieldBinlogs(tStatsLogs.GetFieldID(), currStatsLogs)
if fieldStatsLog == nil {
currStatsLogs = append(currStatsLogs, tStatsLogs)
} else {
fieldStatsLog.Binlogs = append(fieldStatsLog.Binlogs, tStatsLogs.Binlogs...)
}
}
clonedSegment.Statslogs = currStatsLogs
// deltalogs
currDeltaLogs := clonedSegment.GetDeltalogs()
for _, tDeltaLogs := range deltalogs {

View File

@ -541,7 +541,19 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0", "statslog1")},
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}},
}}
assert.True(t, proto.Equal(expected, updated))
fmt.Println(updated)
fmt.Println(expected)
assert.Equal(t, updated.StartPosition, expected.StartPosition)
assert.Equal(t, updated.DmlPosition, expected.DmlPosition)
assert.Equal(t, updated.DmlPosition, expected.DmlPosition)
assert.Equal(t, len(updated.Binlogs[0].Binlogs), len(expected.Binlogs[0].Binlogs))
assert.Equal(t, len(updated.Statslogs[0].Binlogs), len(expected.Statslogs[0].Binlogs))
assert.Equal(t, len(updated.Deltalogs[0].Binlogs), len(expected.Deltalogs[0].Binlogs))
assert.Equal(t, updated.State, expected.State)
assert.Equal(t, updated.size, expected.size)
assert.Equal(t, updated.NumOfRows, expected.NumOfRows)
})
t.Run("update non-existed segment", func(t *testing.T) {

View File

@ -52,9 +52,8 @@ type uploader interface {
//
// errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever.
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, segStats []byte, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error)
uploadStatsLog(ctx context.Context, segID, partID UniqueID, segStats []byte, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error)
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error)
uploadDeltaLog(ctx context.Context, segID, partID UniqueID, dData *DeleteData, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error)
}
@ -141,7 +140,6 @@ func (b *binlogIO) upload(
ctx context.Context,
segID, partID UniqueID,
iDatas []*InsertData,
segStats []byte,
dData *DeleteData,
meta *etcdpb.CollectionMeta) (*segPaths, error) {
@ -163,7 +161,7 @@ func (b *binlogIO) upload(
continue
}
kv, inpaths, err := b.genInsertBlobs(iData, partID, segID, meta)
blobs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", meta.GetID()),
@ -172,7 +170,7 @@ func (b *binlogIO) upload(
return nil, err
}
for k, v := range kv {
for k, v := range blobs {
kvs[k] = v
}
@ -185,25 +183,16 @@ func (b *binlogIO) upload(
}
insertField2Path[fID] = tmpBinlog
}
}
pkID := getPKID(meta)
if pkID == common.InvalidFieldID {
log.Error("get invalid field id when finding pk", zap.Int64("collectionID", meta.GetID()), zap.Any("fields", meta.GetSchema().GetFields()))
return nil, errors.New("invalid pk id")
}
logID, err := b.allocID()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(meta.GetID(), partID, segID, pkID, logID)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
fileLen := len(segStats)
kvs[key] = segStats
statsField2Path[pkID] = &datapb.FieldBinlog{
FieldID: pkID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
for fID, path := range statspaths {
tmpBinlog, ok := statsField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statsField2Path[fID] = tmpBinlog
}
}
for _, path := range insertField2Path {
@ -236,7 +225,7 @@ func (b *binlogIO) upload(
})
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
err := b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, err
}
@ -263,24 +252,25 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
}
// genInsertBlobs returns kvs, insert-paths, stats-paths
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, error) {
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
inCodec := storage.NewInsertCodec(meta)
inlogs, _, err := inCodec.Serialize(partID, segID, data)
inlogs, statslogs, err := inCodec.Serialize(partID, segID, data)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
var (
kvs = make(map[string][]byte, len(inlogs)+1)
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
kvs = make(map[string][]byte, len(inlogs)+len(statslogs))
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
statspaths = make(map[UniqueID]*datapb.FieldBinlog)
)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := b.idxGenerator(len(inlogs)+1, notifyGenIdx)
generator, err := b.idxGenerator(len(inlogs)+len(statslogs), notifyGenIdx)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for _, blob := range inlogs {
@ -299,7 +289,24 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
}
return kvs, inpaths, nil
for _, blob := range statslogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
value := blob.GetValue()
fileLen := len(value)
kvs[key] = value
statspaths[fID] = &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
}
}
return kvs, inpaths, statspaths, nil
}
func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, error) {
@ -330,11 +337,10 @@ func (b *binlogIO) uploadInsertLog(
segID UniqueID,
partID UniqueID,
iData *InsertData,
meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error) {
meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
var (
kvs = make(map[string][]byte)
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statsField2Path = make(map[UniqueID]*datapb.FieldBinlog)
)
tf, ok := iData.Data[common.TimeStampField]
@ -343,20 +349,16 @@ func (b *binlogIO) uploadInsertLog(
zap.Int64("segmentID", segID),
zap.Int64("collectionID", meta.GetID()),
)
return nil, nil
return nil, nil, nil
}
kv, inpaths, err := b.genInsertBlobs(iData, partID, segID, meta)
kvs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", meta.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, err
}
for k, v := range kv {
kvs[k] = v
return nil, nil, err
}
for fID, path := range inpaths {
@ -369,53 +371,21 @@ func (b *binlogIO) uploadInsertLog(
insertField2Path[fID] = tmpBinlog
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, err
for fID, path := range statspaths {
tmpBinlog, ok := statsField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statsField2Path[fID] = tmpBinlog
}
return insertField2Path, nil
}
func (b *binlogIO) uploadStatsLog(
ctx context.Context,
segID UniqueID,
partID UniqueID,
segStats []byte,
meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error) {
var (
statsInfo = make([]*datapb.FieldBinlog, 0)
kvs = make(map[string][]byte)
)
pkID := getPKID(meta)
if pkID == common.InvalidFieldID {
log.Error("get invalid field id when finding pk", zap.Int64("collectionID", meta.GetID()), zap.Any("fields", meta.GetSchema().GetFields()))
return nil, errors.New("invalid pk id")
}
logID, err := b.allocID()
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(meta.GetID(), partID, segID, pkID, logID)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
fileLen := len(segStats)
kvs[key] = segStats
statsInfo = append(statsInfo, &datapb.FieldBinlog{
FieldID: pkID,
Binlogs: []*datapb.Binlog{{
LogPath: key,
LogSize: int64(fileLen),
}},
})
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
if err != nil {
return nil, err
return nil, nil, err
}
return statsInfo, nil
return insertField2Path, statsField2Path, nil
}
func (b *binlogIO) uploadDeltaLog(

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -55,7 +56,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
Tss: []uint64{666666},
}
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
@ -63,25 +64,21 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, []byte{}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(p.inPaths))
assert.Equal(t, 1, len(p.statsPaths))
assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs()))
assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs()))
assert.NotNil(t, p.deltaInfo)
ctx, cancel := context.WithCancel(context.Background())
in, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
in, stats, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(in))
assert.Equal(t, 1, len(in[0].GetBinlogs()))
stats, err := b.uploadStatsLog(ctx, 1, 10, []byte{}, meta)
assert.NoError(t, err)
assert.Equal(t, 1, len(stats))
assert.Equal(t, 1, len(stats[0].GetBinlogs()))
deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.NoError(t, err)
@ -90,18 +87,14 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
cancel()
p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, p)
in, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, in)
stats, err = b.uploadStatsLog(ctx, 1, 10, []byte{}, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, stats)
deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, deltas)
@ -117,17 +110,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
}
iData := genEmptyInsertData()
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.NotEmpty(t, p.statsPaths)
assert.Empty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
iData = &InsertData{Data: make(map[int64]storage.FieldData)}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Empty(t, p.inPaths)
assert.NotEmpty(t, p.statsPaths)
assert.Empty(t, p.statsPaths)
assert.Empty(t, p.deltaInfo)
iData = genInsertData()
@ -136,7 +129,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
Tss: []uint64{1},
RowCount: 1,
}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
@ -150,35 +143,27 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
RowCount: 1,
}
ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
in, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
in, _, err := b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.Error(t, err)
assert.Empty(t, in)
stats, err := b.uploadStatsLog(ctx, 1, 10, []byte{}, meta)
assert.Error(t, err)
assert.Empty(t, stats)
deltas, err := b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.Error(t, err)
assert.Empty(t, deltas)
alloc.isvalid = false
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
in, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.Error(t, err)
assert.Empty(t, in)
stats, err = b.uploadStatsLog(ctx, 1, 10, []byte{}, meta)
assert.Error(t, err)
assert.Empty(t, stats)
deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.Error(t, err)
assert.Empty(t, deltas)
@ -187,18 +172,14 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
for _, field := range meta.GetSchema().GetFields() {
field.IsPrimaryKey = false
}
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
in, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
in, _, err = b.uploadInsertLog(ctx, 1, 10, iData, meta)
assert.Error(t, err)
assert.Empty(t, in)
stats, err = b.uploadStatsLog(ctx, 1, 10, []byte{}, meta)
assert.Error(t, err)
assert.Empty(t, stats)
deltas, err = b.uploadDeltaLog(ctx, 1, 10, dData, meta)
assert.Error(t, err)
assert.Empty(t, deltas)
@ -349,46 +330,52 @@ func TestBinlogIOInnerMethods(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
kvs, pin, err := b.genInsertBlobs(genInsertData(), 10, 1, meta)
if test.expectError {
assert.Error(t, err)
return
}
helper, err := typeutil.CreateSchemaHelper(meta.Schema)
assert.NoError(t, err)
primaryKeyFieldSchema, err := helper.GetPrimaryKeyField()
assert.NoError(t, err)
primaryKeyFieldID := primaryKeyFieldSchema.GetFieldID()
kvs, pin, pstats, err := b.genInsertBlobs(genInsertData(), 10, 1, meta)
assert.NoError(t, err)
assert.Equal(t, 1, len(pstats))
assert.Equal(t, 12, len(pin))
assert.Equal(t, 12, len(kvs))
assert.Equal(t, 13, len(kvs))
log.Debug("test paths",
zap.Any("kvs no.", len(kvs)),
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()))
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()),
zap.String("stats paths field0", pstats[primaryKeyFieldID].GetBinlogs()[0].GetLogPath()))
})
}
})
t.Run("Test genInsertBlobs error", func(t *testing.T) {
kvs, pin, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil)
kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
kvs, pin, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta)
kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
errAlloc := NewAllocatorFactory()
errAlloc.errAllocBatch = true
bin := &binlogIO{cm, errAlloc}
kvs, pin, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
})
t.Run("Test idxGenerator", func(t *testing.T) {

View File

@ -20,8 +20,8 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"go.uber.org/zap"
@ -35,10 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
maxBloomFalsePositive float64 = 0.005
)
type (
primaryKey = storage.PrimaryKey
int64PrimaryKey = storage.Int64PrimaryKey
@ -72,10 +68,10 @@ type Channel interface {
listCompactedSegmentIDs() map[UniqueID][]UniqueID
updateStatistics(segID UniqueID, numRows int64)
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats)
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
segmentFlushed(segID UniqueID)
getSegmentStatslog(segID UniqueID) ([]byte, error)
initSegmentBloomFilter(seg *Segment) error
}
// ChannelMeta contains channel meta and the latest segments infos of the channel.
@ -162,23 +158,6 @@ func (c *ChannelMeta) maxRowCountPerSegment(ts Timestamp) (int64, error) {
return int64(threshold / float64(sizePerRecord)), nil
}
// initSegmentBloomFilter initialize segment pkFilter with a new bloom filter.
// this new BF will be initialized with estimated max rows and default false positive rate.
func (c *ChannelMeta) initSegmentBloomFilter(s *Segment) error {
var ts Timestamp
if s.startPos != nil {
ts = s.startPos.Timestamp
}
maxRowCount, err := c.maxRowCountPerSegment(ts)
if err != nil {
log.Warn("initSegmentBloomFilter failed, cannot estimate max row count", zap.Error(err))
return err
}
s.pkStat.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive)
return nil
}
// addSegment adds the segment to current channel. Segments can be added as *new*, *normal* or *flushed*.
// Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`.
func (c *ChannelMeta) addSegment(req addSegmentReq) error {
@ -208,8 +187,8 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
endPos: req.endPos,
}
seg.sType.Store(req.segType)
// Set up bloom filter.
err := c.initPKBloomFilter(context.TODO(), seg, req.statsBinLogs, req.recoverTs)
// Set up pk stats
err := c.InitPKstats(context.TODO(), seg, req.statsBinLogs, req.recoverTs)
if err != nil {
log.Error("failed to init bloom filter",
zap.Int64("segment ID", req.segID),
@ -256,7 +235,8 @@ func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment {
return results
}
func (c *ChannelMeta) initPKBloomFilter(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", s.segmentID))
log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs)))
schema, err := c.getCollectionSchema(s.collectionID, ts)
@ -287,10 +267,11 @@ func (c *ChannelMeta) initPKBloomFilter(ctx context.Context, s *Segment, statsBi
// no stats log to parse, initialize a new BF
if len(bloomFilterFiles) == 0 {
log.Warn("no stats files to load, initializa a new one")
return c.initSegmentBloomFilter(s)
log.Warn("no stats files to load")
return nil
}
// read historical PK filter
values, err := c.chunkManager.MultiRead(ctx, bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
@ -306,26 +287,46 @@ func (c *ChannelMeta) initPKBloomFilter(ctx context.Context, s *Segment, statsBi
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return err
}
var size uint
for _, stat := range stats {
// use first BF to merge
if s.pkStat.pkFilter == nil {
s.pkStat.pkFilter = stat.BF
} else {
// for compatibility, statslog before 2.1.2 uses separated stats log which needs to be merged
// assuming all legacy BF has same attributes.
err = s.pkStat.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
s.updatePk(stat.MinPk)
s.updatePk(stat.MaxPk)
size += stat.BF.Cap()
s.historyStats = append(s.historyStats, pkStat)
}
log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size))
return nil
}
func (c *ChannelMeta) RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats) {
c.segMu.Lock()
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
log.Info("roll pk stats", zap.Int64("segment id", segID))
if ok && seg.notFlushed() {
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
seg.historyStats = append(seg.historyStats, pkStat)
}
seg.currentStat = nil
return
}
// should not happen at all
if ok {
log.Warn("only growing segment should roll PK stats", zap.Int64("segment", segID), zap.Any("type", seg.sType))
} else {
log.Warn("can not find segment", zap.Int64("segment", segID))
}
}
// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
func (c *ChannelMeta) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
@ -517,7 +518,8 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
s.compactedTo = seg.segmentID
s.setType(datapb.SegmentType_Compacted)
// release bloom filter
s.pkStat.pkFilter = nil
s.currentStat = nil
s.historyStats = nil
}
c.segMu.Unlock()
@ -556,11 +558,6 @@ func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, n
numRows: numOfRows,
}
err := c.initSegmentBloomFilter(seg)
if err != nil {
return err
}
seg.updatePKRange(ids)
seg.setType(datapb.SegmentType_Flushed)
@ -610,30 +607,3 @@ func (c *ChannelMeta) listNotFlushedSegmentIDs() []UniqueID {
return segIDs
}
// getSegmentStatslog returns the segment statslog for the provided segment id.
func (c *ChannelMeta) getSegmentStatslog(segID UniqueID) ([]byte, error) {
c.segMu.RLock()
defer c.segMu.RUnlock()
colID := c.getCollectionID()
schema, err := c.getCollectionSchema(colID, 0)
if err != nil {
return nil, err
}
var pkID UniqueID
var pkType schemapb.DataType
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
pkID = field.GetFieldID()
pkType = field.GetDataType()
}
}
if seg, ok := c.segments[segID]; ok && seg.isValid() {
return seg.getSegmentStatslog(pkID, pkType)
}
return nil, fmt.Errorf("segment not found: %d", segID)
}

View File

@ -56,7 +56,7 @@ func (kv *mockDataCM) MultiRead(ctx context.Context, keys []string) ([][]byte, e
FieldID: common.RowIDField,
Min: 0,
Max: 10,
BF: bloom.NewWithEstimates(100000, maxBloomFalsePositive),
BF: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
}
buffer, _ := json.Marshal(stats)
return [][]byte{buffer}, nil
@ -774,7 +774,6 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) {
for i := 0; i < 100; i++ {
cases = append(cases, rand.Int63())
}
buf := make([]byte, 8)
for _, c := range cases {
channel.updateSegmentPKRange(1, &storage.Int64FieldData{Data: []int64{c}}) // new segment
channel.updateSegmentPKRange(2, &storage.Int64FieldData{Data: []int64{c}}) // normal segment
@ -782,16 +781,8 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) {
pk := newInt64PrimaryKey(c)
assert.Equal(t, true, segNew.pkStat.minPK.LE(pk))
assert.Equal(t, true, segNew.pkStat.maxPK.GE(pk))
assert.Equal(t, true, segNormal.pkStat.minPK.LE(pk))
assert.Equal(t, true, segNormal.pkStat.maxPK.GE(pk))
common.Endian.PutUint64(buf, uint64(c))
assert.True(t, segNew.pkStat.pkFilter.Test(buf))
assert.True(t, segNormal.pkStat.pkFilter.Test(buf))
assert.True(t, segNew.isPKExist(pk))
assert.True(t, segNormal.isPKExist(pk))
}
}
@ -910,50 +901,6 @@ func (s *ChannelMetaSuite) TestHasSegment() {
}
}
func (s *ChannelMetaSuite) TestGetSegmentStatslog() {
s.channel.updateSegmentPKRange(1, &storage.Int64FieldData{Data: []int64{1}})
bs, err := s.channel.getSegmentStatslog(1)
s.NoError(err)
segment, ok := s.getSegmentByID(1)
s.Require().True(ok)
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{1}})
s.Require().NoError(err)
expected, err := segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
s.channel.updateSegmentPKRange(2, &storage.Int64FieldData{Data: []int64{2}})
bs, err = s.channel.getSegmentStatslog(2)
s.NoError(err)
segment, ok = s.getSegmentByID(2)
s.Require().True(ok)
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{2}})
s.Require().NoError(err)
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
s.channel.updateSegmentPKRange(3, &storage.Int64FieldData{Data: []int64{3}})
bs, err = s.channel.getSegmentStatslog(3)
s.NoError(err)
segment, ok = s.getSegmentByID(3)
s.Require().True(ok)
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{3}})
s.Require().NoError(err)
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
s.Require().NoError(err)
s.Equal(expected, bs)
_, err = s.channel.getSegmentStatslog(4)
s.Error(err)
_, err = s.channel.getSegmentStatslog(1)
s.ErrorIs(err, errSegmentStatsNotChanged)
}
func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
s.channel.segMu.RLock()
defer s.channel.segMu.RUnlock()

View File

@ -192,10 +192,8 @@ func (t *compactionTask) uploadSingleInsertLog(
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
segment *Segment,
pkID UniqueID,
fID2Content map[UniqueID][]interface{},
fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, error) {
fID2Type map[UniqueID]schemapb.DataType) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
iData := &InsertData{
Data: make(map[storage.FieldID]storage.FieldData)}
@ -203,32 +201,23 @@ func (t *compactionTask) uploadSingleInsertLog(
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, errors.New("Unexpected error")
return nil, nil, errors.New("Unexpected error")
}
fData, err := interface2FieldData(tp, content, int64(len(content)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, err
return nil, nil, err
}
if fID == pkID {
err = segment.updatePKRange(fData)
if err != nil {
log.Warn("update pk range failed", zap.Error(err))
return nil, err
}
}
iData.Data[fID] = fData
}
inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta)
inPaths, statPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta)
if err != nil {
return nil, err
return nil, nil, err
}
return inPaths, nil
return inPaths, statPaths, nil
}
func (t *compactionTask) merge(
@ -237,7 +226,7 @@ func (t *compactionTask) merge(
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
delta map[interface{}]Timestamp) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, *Segment, int64, error) {
delta map[interface{}]Timestamp) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, int64, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
@ -250,18 +239,18 @@ func (t *compactionTask) merge(
err error
// statslog generation
segment = &Segment{} // empty segment used for bf generation
pkID UniqueID
pkType schemapb.DataType
pkID UniqueID
pkType schemapb.DataType
fID2Type = make(map[UniqueID]schemapb.DataType)
fID2Content = make(map[UniqueID][]interface{})
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)
)
t.Channel.initSegmentBloomFilter(segment)
statField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statPaths = make([]*datapb.FieldBinlog, 0)
)
isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
@ -283,6 +272,18 @@ func (t *compactionTask) merge(
}
}
addStatFieldPath := func(statPaths map[UniqueID]*datapb.FieldBinlog) {
for fID, path := range statPaths {
tmpBinlog, ok := statField2Path[fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
statField2Path[fID] = tmpBinlog
}
}
// get pkID, pkType, dim
for _, fs := range meta.GetSchema().GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
@ -296,7 +297,7 @@ func (t *compactionTask) merge(
if t.Key == "dim" {
if dim, err = strconv.Atoi(t.Value); err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, nil, nil, 0, err
return nil, nil, 0, err
}
break
}
@ -312,27 +313,27 @@ func (t *compactionTask) merge(
currentRows := 0
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)
uploadStatsTimeCost := time.Duration(0)
for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := t.download(ctxTimeout, path)
if err != nil {
log.Warn("download insertlogs wrong")
return nil, nil, nil, 0, err
return nil, nil, 0, err
}
downloadTimeCost += time.Since(downloadStart)
iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType)
if err != nil {
log.Warn("new insert binlogs Itr wrong")
return nil, nil, nil, 0, err
return nil, nil, 0, err
}
for iter.HasNext() {
vInter, _ := iter.Next()
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong")
return nil, nil, nil, 0, errors.New("unexpected error")
return nil, nil, 0, errors.New("unexpected error")
}
if isDeletedValue(v) {
@ -349,7 +350,7 @@ func (t *compactionTask) merge(
row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong")
return nil, nil, nil, 0, errors.New("unexpected error")
return nil, nil, 0, errors.New("unexpected error")
}
for fID, vInter := range row {
@ -363,12 +364,13 @@ func (t *compactionTask) merge(
if currentRows == maxRowsPerBinlog {
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, segment, pkID, fID2Content, fID2Type)
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths)
addStatFieldPath(statsPaths)
fID2Content = make(map[int64][]interface{})
currentRows = 0
@ -379,13 +381,14 @@ func (t *compactionTask) merge(
}
if currentRows != 0 {
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, segment, pkID, fID2Content, fID2Type)
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths)
addStatFieldPath(statsPaths)
numRows += int64(currentRows)
numBinlogs++
@ -395,31 +398,17 @@ func (t *compactionTask) merge(
insertPaths = append(insertPaths, path)
}
// marshal segment statslog
segStats, err := segment.getSegmentStatslog(pkID, pkType)
if err != nil && !errors.Is(err, errSegmentStatsNotChanged) {
log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err))
return nil, nil, nil, 0, err
}
var statsPaths []*datapb.FieldBinlog
if len(segStats) > 0 {
uploadStatsStart := time.Now()
statsPaths, err = t.uploadStatsLog(ctxTimeout, targetSegID, partID, segStats, meta)
if err != nil {
return nil, nil, nil, 0, err
}
uploadStatsTimeCost += time.Since(uploadStatsStart)
for _, path := range statField2Path {
statPaths = append(statPaths, path)
}
log.Info("merge end", zap.Int64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs),
zap.Float64("download insert log elapse in ms", nano2Milli(downloadTimeCost)),
zap.Float64("upload insert log elapse in ms", nano2Milli(uploadInsertTimeCost)),
zap.Float64("upload stats log elapse in ms", nano2Milli(uploadStatsTimeCost)),
zap.Float64("merge elapse in ms", nano2Milli(time.Since(mergeStart))))
return insertPaths, statsPaths, segment, numRows, nil
return insertPaths, statPaths, numRows, nil
}
func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
@ -549,7 +538,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
return nil, err
}
inPaths, statsPaths, _, numRows, err := t.merge(ctxTimeout, allPs, targetSegID, partID, meta, deltaPk2Ts)
inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPs, targetSegID, partID, meta, deltaPk2Ts)
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return nil, err

View File

@ -277,7 +277,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iData := genInsertDataWithExpiredTS()
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -296,7 +296,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
inPaths, statsPaths, _, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 1, len(inPaths[0].GetBinlogs()))
@ -315,7 +315,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -332,12 +332,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
inPaths, statsPaths, _, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths))
assert.Equal(t, 1, len(statsPaths[0].GetBinlogs()))
assert.Equal(t, 2, len(statsPaths[0].GetBinlogs()))
})
t.Run("Merge with expiration", func(t *testing.T) {
@ -348,7 +348,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -375,7 +375,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
CollectionTtl: 864000,
},
}
inPaths, statsPaths, _, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(0), numOfRow)
assert.Equal(t, 0, len(inPaths))
@ -390,7 +390,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -409,7 +409,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
_, _, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "64"},
@ -427,7 +427,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -447,7 +447,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
_, _, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "dim"},
@ -659,11 +659,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 1,
}
cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, []byte{}, dData1, meta)
cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, []byte{}, dData2, meta)
cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
@ -790,11 +790,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 0,
}
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, []byte{}, dData1, meta)
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths1.inPaths))
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, []byte{}, dData2, meta)
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta)
require.NoError(t, err)
require.Equal(t, 12, len(cpaths2.inPaths))
@ -842,11 +842,11 @@ type mockFlushManager struct {
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error {
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) ([]*Blob, error) {
if mfm.returnError {
return fmt.Errorf("mock error")
return nil, fmt.Errorf("mock error")
}
return nil
return nil, nil
}
func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error {

View File

@ -939,7 +939,11 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
numRows: req.GetNumOfRows(),
}
channel.(*ChannelMeta).initPKBloomFilter(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
if err != nil {
status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error())
return status, nil
}
// block all flow graph so it's safe to remove segment
ds.fg.Blockall()

View File

@ -25,8 +25,6 @@ import (
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -252,24 +250,11 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([
func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss []Timestamp) (map[UniqueID][]primaryKey, map[UniqueID][]uint64) {
segID2Pks := make(map[UniqueID][]primaryKey)
segID2Tss := make(map[UniqueID][]uint64)
buf := make([]byte, 8)
segments := dn.channel.filterSegments(partID)
for index, pk := range pks {
for _, segment := range segments {
segmentID := segment.segmentID
exist := false
switch pk.Type() {
case schemapb.DataType_Int64:
int64Pk := pk.(*int64PrimaryKey)
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
exist = segment.pkStat.pkFilter.Test(buf)
case schemapb.DataType_VarChar:
varCharPk := pk.(*varCharPrimaryKey)
exist = segment.pkStat.pkFilter.TestString(varCharPk.Value)
default:
//TODO::
}
if exist {
if segment.isPKExist(pk) {
segID2Pks[segmentID] = append(segID2Pks[segmentID], pk)
segID2Tss[segmentID] = append(segID2Tss[segmentID], tss[index])
}

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"testing"
"time"
@ -59,26 +60,45 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
}
func genMockChannel(segIDs []int64, pks []primaryKey, chanName string) *ChannelMeta {
buf := make([]byte, 8)
filter0 := bloom.NewWithEstimates(1000000, 0.01)
pkStat1 := &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(1000000, 0.01),
}
pkStat2 := &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(1000000, 0.01),
}
for i := 0; i < 3; i++ {
switch pks[i].Type() {
case schemapb.DataType_Int64:
common.Endian.PutUint64(buf, uint64(pks[i].(*int64PrimaryKey).Value))
filter0.Add(buf)
case schemapb.DataType_VarChar:
filter0.AddString(pks[i].(*varCharPrimaryKey).Value)
pkStat1.UpdateMinMax(pks[i])
buf := make([]byte, 8)
for _, pk := range pks {
switch pk.Type() {
case schemapb.DataType_Int64:
int64Value := pk.(*int64PrimaryKey).Value
common.Endian.PutUint64(buf, uint64(int64Value))
pkStat1.PkFilter.Add(buf)
case schemapb.DataType_VarChar:
stringValue := pk.(*varCharPrimaryKey).Value
pkStat1.PkFilter.AddString(stringValue)
default:
}
}
}
filter1 := bloom.NewWithEstimates(1000000, 0.01)
for i := 3; i < 5; i++ {
switch pks[i].Type() {
case schemapb.DataType_Int64:
common.Endian.PutUint64(buf, uint64(pks[i].(*int64PrimaryKey).Value))
filter1.Add(buf)
case schemapb.DataType_VarChar:
filter1.AddString(pks[i].(*varCharPrimaryKey).Value)
pkStat2.UpdateMinMax(pks[i])
buf := make([]byte, 8)
for _, pk := range pks {
switch pk.Type() {
case schemapb.DataType_Int64:
int64Value := pk.(*int64PrimaryKey).Value
common.Endian.PutUint64(buf, uint64(int64Value))
pkStat2.PkFilter.Add(buf)
case schemapb.DataType_VarChar:
stringValue := pk.(*varCharPrimaryKey).Value
pkStat2.PkFilter.AddString(stringValue)
default:
}
}
}
@ -101,9 +121,9 @@ func genMockChannel(segIDs []int64, pks []primaryKey, chanName string) *ChannelM
}
seg.setType(segTypes[i])
if i < 3 {
seg.pkStat.pkFilter = filter0
seg.currentStat = pkStat1
} else {
seg.pkStat.pkFilter = filter1
seg.currentStat = pkStat2
}
channel.segments[segIDs[i]] = &seg
}
@ -201,6 +221,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
assert.Nil(t, err)
segID2Pks, _ := dn.filterSegmentByPK(0, int64Pks, tss)
fmt.Println(segID2Pks)
expected := map[int64][]primaryKey{
segIDs[0]: int64Pks[0:3],
segIDs[1]: int64Pks[0:3],

View File

@ -18,7 +18,6 @@ package datanode
import (
"context"
"errors"
"fmt"
"math"
"reflect"
@ -91,11 +90,6 @@ func (l *timeTickLogger) printLogs(start, end Timestamp) {
log.Debug("IBN timetick log", zap.Time("from", t1), zap.Time("to", t2), zap.Duration("elapsed", t2.Sub(t1)), zap.Uint64("start", start), zap.Uint64("end", end), zap.String("vChannelName", l.vChannelName))
}
type segmentCheckPoint struct {
numRows int64
pos internalpb.MsgPosition
}
func (ibNode *insertBufferNode) Name() string {
return "ibNode-" + ibNode.channelName
}
@ -398,20 +392,23 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
zap.Any("position", endPosition),
zap.String("channel", ibNode.channelName),
)
segStats, err := ibNode.channel.getSegmentStatslog(task.segmentID)
if err != nil && !errors.Is(err, errSegmentStatsNotChanged) {
log.Error("failed to get segment stats log", zap.Int64("segmentID", task.segmentID), zap.Error(err))
panic(err)
}
err = retry.Do(ibNode.ctx, func() error {
return ibNode.flushManager.flushBufferData(task.buffer,
segStats,
// use the flushed pk stats to take current stat
var pkStats []*storage.PrimaryKeyStats
err := retry.Do(ibNode.ctx, func() error {
statBlobs, err := ibNode.flushManager.flushBufferData(task.buffer,
task.segmentID,
task.flushed,
task.dropped,
endPosition)
if err != nil {
return err
}
pkStats, err = storage.DeserializeStats(statBlobs)
if err != nil {
log.Warn("failed to deserialize bloom filter files", zap.Error(err))
return err
}
return nil
}, getFlowGraphRetryOpt())
if err != nil {
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
@ -426,6 +423,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
}
segmentsToSync = append(segmentsToSync, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
ibNode.channel.RollPKstats(task.segmentID, pkStats)
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
if task.auto {

View File

@ -543,6 +543,138 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
})
}
func TestRollBF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.EtcdCfg.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
dataFactory := NewDataFactory()
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
factory := dependency.NewDefaultFactory(true)
flushPacks := []*segmentFlushPack{}
fpMut := sync.Mutex{}
wg := sync.WaitGroup{}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
startPos := channel.listNewSegmentsStartPositions()
channel.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
return pos.GetSegmentID()
}))
if pack.flushed || pack.dropped {
channel.segmentFlushed(pack.segmentID)
}
wg.Done()
}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
channel: channel,
msFactory: factory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
inMsg := genFlowGraphInsertMsg("")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(1)
var iMsg flowgraph.Msg = &inMsg
t.Run("Pure roll BF", func(t *testing.T) {
tmp := Params.DataNodeCfg.FlushInsertBufferSize
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, 0, len(flushPacks))
// should not be flushed with only 1 one
channel.segMu.Lock()
seg, ok := channel.segments[UniqueID(1)]
channel.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, datapb.SegmentType_New, seg.getType())
assert.Equal(t, int64(1), seg.numRows)
assert.Equal(t, uint64(100), seg.startPos.GetTimestamp())
assert.Equal(t, uint64(123), seg.endPos.GetTimestamp())
// because this is the origincal
assert.True(t, seg.currentStat.PkFilter.Cap() > uint(1000000))
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
iMsg = &inMsg
// Triger auto flush
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToSync))
t.Log("segments to flush", fgm.segmentsToSync)
for _, im := range fgm.segmentsToSync {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
assert.Equal(t, 1, len(flushPacks))
assert.Less(t, 0, len(flushPacks[0].insertLogs))
assert.False(t, flushPacks[0].flushed)
assert.True(t, ok)
assert.Equal(t, datapb.SegmentType_Normal, seg.getType())
assert.Equal(t, int64(2), seg.numRows)
assert.Equal(t, uint64(100), seg.startPos.GetTimestamp())
assert.Equal(t, uint64(234), seg.endPos.GetTimestamp())
// filter should be rolled
assert.Nil(t, seg.currentStat)
assert.True(t, len(seg.historyStats) == 1)
assert.True(t, seg.historyStats[0].PkFilter.Cap() < 100)
})
}
type InsertBufferNodeSuit struct {
suite.Suite

View File

@ -45,7 +45,7 @@ import (
// flushManager defines a flush manager signature
type flushManager interface {
// notify flush manager insert buffer data
flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) ([]*Blob, error)
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
@ -339,8 +339,7 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool,
dropped bool, pos *internalpb.MsgPosition) error {
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) ([]*Blob, error) {
tr := timerecord.NewTimeRecorder("flushDuration")
// empty flush
if data == nil || data.buffer == nil {
@ -348,12 +347,12 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
// map[UniqueID]string{}, map[UniqueID]string{}, flushed, dropped, pos)
m.handleInsertTask(segmentID, &flushBufferInsertTask{}, map[UniqueID]*datapb.Binlog{}, map[UniqueID]*datapb.Binlog{},
flushed, dropped, pos)
return nil
return nil, nil
}
collID, partID, meta, err := m.getSegmentMeta(segmentID, pos)
if err != nil {
return err
return nil, err
}
// get memory size of buffer data
fieldMemorySize := make(map[int64]int)
@ -364,25 +363,24 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
// encode data and convert output data
inCodec := storage.NewInsertCodec(meta)
binLogs, _, err := inCodec.Serialize(partID, segmentID, data.buffer)
binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer)
if err != nil {
return err
return nil, err
}
// binlogs + 1 statslog
start, _, err := m.allocIDBatch(uint32(len(binLogs) + 1))
// binlogs
start, _, err := m.allocIDBatch(uint32(len(binLogs) + len(statsBinlogs)))
if err != nil {
return err
return nil, err
}
field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))
for idx, blob := range binLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return err
return nil, err
}
logidx := start + int64(idx)
@ -400,29 +398,30 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
LogPath: key,
LogSize: int64(fieldMemorySize[fieldID]),
}
field2Logidx[fieldID] = logidx
}
field2Stats := make(map[UniqueID]*datapb.Binlog)
// write stats binlog
// if segStats content is not nil, means segment stats changed
if len(segStats) > 0 {
pkID := getPKID(meta)
if pkID == common.InvalidFieldID {
return fmt.Errorf("failed to get pk id for segment %d", segmentID)
for idx, blob := range statsBinlogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return nil, err
}
logidx := start + int64(len(binLogs))
k := metautil.JoinIDPath(collID, partID, segmentID, pkID, logidx)
logidx := start + UniqueID(len(binLogs)+idx)
// no error raise if alloc=false
k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
kvs[key] = segStats
field2Stats[pkID] = &datapb.Binlog{
kvs[key] = blob.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: 0,
TimestampFrom: 0, //TODO
TimestampTo: 0, //TODO,
LogPath: key,
LogSize: int64(len(segStats)),
LogSize: int64(len(blob.Value)),
}
}
@ -432,7 +431,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
}, field2Insert, field2Stats, flushed, dropped, pos)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
return statsBinlogs, nil
}
// notify flush manager del buffer data

View File

@ -174,7 +174,7 @@ func TestRendezvousFlushManager(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -223,7 +223,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -238,10 +238,10 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
rand.Read(id)
id2 := make([]byte, 10)
rand.Read(id2)
m.flushBufferData(nil, nil, 2, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{
MsgID: id,
})
m.flushBufferData(nil, nil, 3, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 3, true, false, &internalpb.MsgPosition{
MsgID: id2,
})
@ -266,7 +266,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish.Add(1)
rand.Read(id)
m.flushBufferData(nil, nil, 2, false, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{
MsgID: id,
})
ti = newTaskInjection(1, func(pack *segmentFlushPack) {
@ -356,7 +356,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := 0; i < size/2; i++ {
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
@ -366,7 +366,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
mut.RUnlock()
for i := size / 2; i < size; i++ {
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
}
@ -400,7 +400,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
@ -413,7 +413,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
@ -452,7 +452,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
@ -473,7 +473,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
})
for i := 1; i < 11; i++ {
m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
@ -520,7 +520,7 @@ func TestRendezvousFlushManager_close(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()

View File

@ -17,17 +17,9 @@
package datanode
import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
@ -44,94 +36,13 @@ type Segment struct {
memorySize int64
compactedTo UniqueID
pkStat pkStatistics
currentStat *storage.PkStatistics
historyStats []*storage.PkStatistics
startPos *internalpb.MsgPosition // TODO readonly
endPos *internalpb.MsgPosition
}
// pkStatistics contains pk field statistic information
type pkStatistics struct {
statsChanged bool // statistic changed
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
maxPK primaryKey // maximal pk value, same above
}
// update set pk min/max value if input value is beyond former range.
func (st *pkStatistics) update(pk primaryKey) error {
if st == nil {
return errors.New("nil pk statistics")
}
if st.minPK == nil {
st.minPK = pk
} else if st.minPK.GT(pk) {
st.minPK = pk
}
if st.maxPK == nil {
st.maxPK = pk
} else if st.maxPK.LT(pk) {
st.maxPK = pk
}
return nil
}
func (st *pkStatistics) updatePKRange(ids storage.FieldData) error {
switch pks := ids.(type) {
case *storage.Int64FieldData:
buf := make([]byte, 8)
for _, pk := range pks.Data {
id := storage.NewInt64PrimaryKey(pk)
err := st.update(id)
if err != nil {
return err
}
common.Endian.PutUint64(buf, uint64(pk))
st.pkFilter.Add(buf)
}
case *storage.StringFieldData:
for _, pk := range pks.Data {
id := storage.NewVarCharPrimaryKey(pk)
err := st.update(id)
if err != nil {
return err
}
st.pkFilter.AddString(pk)
}
default:
return fmt.Errorf("invalid data type for primary key: %T", ids)
}
// mark statistic updated
st.statsChanged = true
return nil
}
// getStatslog return marshaled statslog content if there is any change since last call.
// statslog is marshaled as json.
func (st *pkStatistics) getStatslog(segmentID, pkID UniqueID, pkType schemapb.DataType) ([]byte, error) {
if !st.statsChanged {
return nil, fmt.Errorf("%w segment %d", errSegmentStatsNotChanged, segmentID)
}
pks := storage.PrimaryKeyStats{
FieldID: pkID,
PkType: int64(pkType),
MaxPk: st.maxPK,
MinPk: st.minPK,
BF: st.pkFilter,
}
bs, err := json.Marshal(pks)
if err == nil {
st.statsChanged = false
}
return bs, err
}
type addSegmentReq struct {
segType datapb.SegmentType
segID, collID, partitionID UniqueID
@ -142,10 +53,6 @@ type addSegmentReq struct {
importing bool
}
func (s *Segment) updatePk(pk primaryKey) error {
return s.pkStat.update(pk)
}
func (s *Segment) isValid() bool {
return s.getType() != datapb.SegmentType_Compacted
}
@ -162,23 +69,32 @@ func (s *Segment) setType(t datapb.SegmentType) {
s.sType.Store(t)
}
func (s *Segment) updatePKRange(ids storage.FieldData) error {
log := log.With(zap.Int64("collectionID", s.collectionID),
zap.Int64("partitionID", s.partitionID),
zap.Int64("segmentID", s.segmentID),
)
err := s.pkStat.updatePKRange(ids)
func (s *Segment) updatePKRange(ids storage.FieldData) {
s.InitCurrentStat()
err := s.currentStat.UpdatePKRange(ids)
if err != nil {
log.Warn("failed to updatePKRange", zap.Error(err))
panic(err)
}
}
func (s *Segment) InitCurrentStat() {
if s.currentStat == nil {
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
}
}
}
// check if PK exists is current
func (s *Segment) isPKExist(pk primaryKey) bool {
if s.currentStat != nil && s.currentStat.PkExist(pk) {
return true
}
log.Info("update pk range",
zap.Int64("num_rows", s.numRows), zap.Any("minPK", s.pkStat.minPK), zap.Any("maxPK", s.pkStat.maxPK))
return nil
}
func (s *Segment) getSegmentStatslog(pkID UniqueID, pkType schemapb.DataType) ([]byte, error) {
return s.pkStat.getStatslog(s.segmentID, pkID, pkType)
for _, historyStats := range s.historyStats {
if historyStats.PkExist(pk) {
return true
}
}
return false
}

View File

@ -17,26 +17,16 @@
package datanode
import (
"encoding/json"
"math/rand"
"testing"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
)
func TestSegment_UpdatePKRange(t *testing.T) {
seg := &Segment{
pkStat: pkStatistics{
pkFilter: bloom.NewWithEstimates(100000, 0.005),
},
}
seg := &Segment{}
cases := make([]int64, 0, 100)
for i := 0; i < 100; i++ {
@ -50,57 +40,19 @@ func TestSegment_UpdatePKRange(t *testing.T) {
pk := newInt64PrimaryKey(c)
assert.Equal(t, true, seg.pkStat.minPK.LE(pk))
assert.Equal(t, true, seg.pkStat.maxPK.GE(pk))
assert.Equal(t, true, seg.currentStat.MinPK.LE(pk))
assert.Equal(t, true, seg.currentStat.MaxPK.GE(pk))
common.Endian.PutUint64(buf, uint64(c))
assert.True(t, seg.pkStat.pkFilter.Test(buf))
assert.True(t, seg.currentStat.PkFilter.Test(buf))
assert.True(t, seg.isPKExist(pk))
}
}
func TestSegment_getSegmentStatslog(t *testing.T) {
rand.Seed(time.Now().UnixNano())
func TestEmptySegment(t *testing.T) {
seg := &Segment{}
cases := make([][]int64, 0, 100)
for i := 0; i < 100; i++ {
tc := make([]int64, 0, 10)
for j := 0; j < 100; j++ {
tc = append(tc, rand.Int63())
}
cases = append(cases, tc)
}
buf := make([]byte, 8)
for _, tc := range cases {
seg := &Segment{
pkStat: pkStatistics{
pkFilter: bloom.NewWithEstimates(100000, 0.005),
}}
seg.updatePKRange(&storage.Int64FieldData{
Data: tc,
})
statBytes, err := seg.getSegmentStatslog(1, schemapb.DataType_Int64)
assert.NoError(t, err)
pks := storage.PrimaryKeyStats{}
err = json.Unmarshal(statBytes, &pks)
require.NoError(t, err)
assert.Equal(t, int64(1), pks.FieldID)
assert.Equal(t, int64(schemapb.DataType_Int64), pks.PkType)
for _, v := range tc {
pk := newInt64PrimaryKey(v)
assert.True(t, pks.MinPk.LE(pk))
assert.True(t, pks.MaxPk.GE(pk))
common.Endian.PutUint64(buf, uint64(v))
assert.True(t, seg.pkStat.pkFilter.Test(buf))
}
}
pks := &storage.PrimaryKeyStats{}
_, err := json.Marshal(pks)
assert.NoError(t, err)
pk := newInt64PrimaryKey(1000)
assert.False(t, seg.isPKExist(pk))
}

View File

@ -869,7 +869,7 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: "index not exist",
Reason: fmt.Sprint("index not exist, collectionID ", req.CollectionID),
},
}, nil
}

View File

@ -459,7 +459,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
log.Error("Failed to execute task: "+err.Error(),
log.Error("Failed to execute task: ", zap.Error(err),
zap.String("traceID", traceID))
return
}
@ -469,7 +469,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
if err != nil {
trace.LogError(span, err)
log.Error("Failed to post-execute task: "+err.Error(),
log.Error("Failed to post-execute task: ", zap.Error(err),
zap.String("traceID", traceID))
return
}

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
@ -127,17 +128,61 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
s, err := historical.getSegmentByID(defaultSegmentID, segmentTypeSealed)
assert.NoError(t, err)
pks := make([]primaryKey, defaultMsgLength)
for i := 0; i < defaultMsgLength; i++ {
pks[i] = newInt64PrimaryKey(int64(i))
}
s.updateBloomFilter(pks)
assert.Nil(t, err)
buf := make([]byte, 8)
// load bf
value := make([]int64, defaultMsgLength)
for i := 0; i < defaultMsgLength; i++ {
common.Endian.PutUint64(buf, uint64(i))
assert.True(t, s.pkFilter.Test(buf))
value[i] = int64(i)
}
data := &storage.Int64FieldData{
Data: value,
}
statsWriter := &storage.StatsWriter{}
err = statsWriter.GeneratePrimaryKeyStats(1, schemapb.DataType_Int64, data)
assert.NoError(t, err)
sr := &storage.StatsReader{}
sr.SetBuffer(statsWriter.GetBuffer())
stat, err := sr.GetPrimaryKeyStats()
assert.NoError(t, err)
s, err = historical.getSegmentByID(defaultSegmentID, segmentTypeSealed)
assert.NoError(t, err)
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
s.historyStats = append(s.historyStats, pkStat)
// another bf
for i := 0; i < defaultMsgLength; i++ {
value[i] = int64(i + defaultMsgLength)
}
err = statsWriter.GeneratePrimaryKeyStats(1, schemapb.DataType_Int64, data)
assert.NoError(t, err)
sr.SetBuffer(statsWriter.GetBuffer())
stat, err = sr.GetPrimaryKeyStats()
assert.NoError(t, err)
s, err = historical.getSegmentByID(defaultSegmentID, segmentTypeSealed)
assert.NoError(t, err)
pkStat = &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
s.historyStats = append(s.historyStats, pkStat)
assert.Nil(t, err)
for i := 0; i < defaultMsgLength*2; i++ {
assert.True(t, s.isPKExist(newInt64PrimaryKey(int64(i))))
}
assert.False(t, s.isPKExist(newInt64PrimaryKey(int64(defaultMsgLength*2+1))))
})
t.Run("test invalid partitionID", func(t *testing.T) {

View File

@ -355,21 +355,8 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm
retPks := make([]primaryKey, 0)
retTss := make([]Timestamp, 0)
buf := make([]byte, 8)
for index, pk := range pks {
exist := false
switch pk.Type() {
case schemapb.DataType_Int64:
int64Pk := pk.(*int64PrimaryKey)
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
exist = segment.pkFilter.Test(buf)
case schemapb.DataType_VarChar:
varCharPk := pk.(*varCharPrimaryKey)
exist = segment.pkFilter.TestString(varCharPk.Value)
default:
return nil, nil, fmt.Errorf("invalid data type of delete primary keys")
}
if exist {
if segment.isPKExist(pk) {
retPks = append(retPks, pk)
retTss = append(retTss, timestamps[index])
}

View File

@ -25,11 +25,13 @@ import (
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/atomic"
)
func getInsertNode() (*insertNode, error) {
@ -251,10 +253,8 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
insertNode.Operate(msg)
s, err := insertNode.metaReplica.getSegmentByID(defaultSegmentID, segmentTypeGrowing)
assert.Nil(t, err)
buf := make([]byte, 8)
for i := 0; i < defaultMsgLength; i++ {
common.Endian.PutUint64(buf, uint64(i))
assert.True(t, s.pkFilter.Test(buf))
assert.True(t, s.isPKExist(newInt64PrimaryKey(int64(i))))
}
})
@ -360,7 +360,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
}
func TestFilterSegmentsByPKs(t *testing.T) {
func TestFilterSealedSegmentsByPKs(t *testing.T) {
t.Run("filter int64 pks", func(t *testing.T) {
buf := make([]byte, 8)
filter := bloom.NewWithEstimates(1000000, 0.01)
@ -368,10 +368,17 @@ func TestFilterSegmentsByPKs(t *testing.T) {
common.Endian.PutUint64(buf, uint64(i))
filter.Add(buf)
}
segment := &Segment{
segmentID: 1,
pkFilter: filter,
stat := &storage.PkStatistics{
PkFilter: filter,
MinPK: storage.NewInt64PrimaryKey(0),
MaxPK: storage.NewInt64PrimaryKey(2),
}
segment := &Segment{
segmentID: 1,
segmentType: atomic.NewInt32(0),
historyStats: []*storage.PkStatistics{stat},
}
segment.setType(commonpb.SegmentState_Sealed)
pk0 := newInt64PrimaryKey(0)
pk1 := newInt64PrimaryKey(1)
@ -398,11 +405,94 @@ func TestFilterSegmentsByPKs(t *testing.T) {
for i := 0; i < 3; i++ {
filter.AddString(fmt.Sprintf("test%d", i))
}
segment := &Segment{
segmentID: 1,
pkFilter: filter,
stat := &storage.PkStatistics{
PkFilter: filter,
MinPK: storage.NewVarCharPrimaryKey("test0"),
MaxPK: storage.NewVarCharPrimaryKey("test2"),
}
segment := &Segment{
segmentID: 1,
segmentType: atomic.NewInt32(0),
historyStats: []*storage.PkStatistics{stat},
}
segment.setType(commonpb.SegmentState_Sealed)
pk0 := newVarCharPrimaryKey("test0")
pk1 := newVarCharPrimaryKey("test1")
pk2 := newVarCharPrimaryKey("test2")
pk3 := newVarCharPrimaryKey("test3")
pk4 := newVarCharPrimaryKey("test4")
timestamps := []uint64{1, 1, 1, 1, 1}
pks, _, err := filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, segment)
assert.Nil(t, err)
assert.Equal(t, len(pks), 3)
pks, _, err = filterSegmentsByPKs([]primaryKey{}, timestamps, segment)
assert.Nil(t, err)
assert.Equal(t, len(pks), 0)
_, _, err = filterSegmentsByPKs(nil, timestamps, segment)
assert.NoError(t, err)
_, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil)
assert.NotNil(t, err)
})
}
func TestFilterGrowingSegmentsByPKs(t *testing.T) {
t.Run("filter int64 pks", func(t *testing.T) {
buf := make([]byte, 8)
filter := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
common.Endian.PutUint64(buf, uint64(i))
filter.Add(buf)
}
stat := &storage.PkStatistics{
PkFilter: filter,
MinPK: storage.NewInt64PrimaryKey(0),
MaxPK: storage.NewInt64PrimaryKey(2),
}
segment := &Segment{
segmentID: 1,
segmentType: atomic.NewInt32(0),
historyStats: []*storage.PkStatistics{stat},
}
segment.setType(commonpb.SegmentState_Growing)
pk0 := newInt64PrimaryKey(0)
pk1 := newInt64PrimaryKey(1)
pk2 := newInt64PrimaryKey(2)
pk3 := newInt64PrimaryKey(3)
pk4 := newInt64PrimaryKey(4)
timestamps := []uint64{1, 1, 1, 1, 1}
pks, _, err := filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, segment)
assert.Nil(t, err)
assert.Equal(t, len(pks), 3)
pks, _, err = filterSegmentsByPKs([]primaryKey{}, timestamps, segment)
assert.Nil(t, err)
assert.Equal(t, len(pks), 0)
_, _, err = filterSegmentsByPKs(nil, timestamps, segment)
assert.NoError(t, err)
_, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil)
assert.NotNil(t, err)
})
t.Run("filter varChar pks", func(t *testing.T) {
filter := bloom.NewWithEstimates(1000000, 0.01)
for i := 0; i < 3; i++ {
filter.AddString(fmt.Sprintf("test%d", i))
}
stat := &storage.PkStatistics{
PkFilter: filter,
MinPK: storage.NewVarCharPrimaryKey("test0"),
MaxPK: storage.NewVarCharPrimaryKey("test2"),
}
segment := &Segment{
segmentID: 1,
segmentType: atomic.NewInt32(0),
historyStats: []*storage.PkStatistics{stat},
}
segment.setType(commonpb.SegmentState_Growing)
pk0 := newVarCharPrimaryKey("test0")
pk1 := newVarCharPrimaryKey("test1")
pk2 := newVarCharPrimaryKey("test2")

View File

@ -38,10 +38,10 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/bits-and-blooms/bloom/v3"
"github.com/golang/protobuf/proto"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -63,11 +63,6 @@ const (
segmentTypeSealed = commonpb.SegmentState_Sealed
)
const (
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
var (
ErrSegmentUnhealthy = errors.New("segment unhealthy")
)
@ -100,7 +95,9 @@ type Segment struct {
indexedFieldInfos *typeutil.ConcurrentMap[UniqueID, *IndexedFieldInfo]
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
// only used by sealed segments
currentStat *storage.PkStatistics
historyStats []*storage.PkStatistics
pool *concurrency.Pool
}
@ -219,9 +216,8 @@ func newSegment(collection *Collection,
indexedFieldInfos: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
recentlyModified: atomic.NewBool(false),
destroyed: atomic.NewBool(false),
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
pool: pool,
historyStats: []*storage.PkStatistics{},
pool: pool,
}
return segment, nil
@ -249,6 +245,9 @@ func deleteSegment(segment *Segment) {
return nil, nil
}).Await()
segment.currentStat = nil
segment.historyStats = nil
log.Info("delete segment from memory",
zap.Int64("collectionID", segment.collectionID),
zap.Int64("partitionID", segment.partitionID),
@ -619,22 +618,48 @@ func (s *Segment) fillIndexedFieldsData(ctx context.Context, collectionID Unique
}
func (s *Segment) updateBloomFilter(pks []primaryKey) {
s.InitCurrentStat()
buf := make([]byte, 8)
for _, pk := range pks {
s.currentStat.UpdateMinMax(pk)
switch pk.Type() {
case schemapb.DataType_Int64:
int64Value := pk.(*int64PrimaryKey).Value
common.Endian.PutUint64(buf, uint64(int64Value))
s.pkFilter.Add(buf)
s.currentStat.PkFilter.Add(buf)
case schemapb.DataType_VarChar:
stringValue := pk.(*varCharPrimaryKey).Value
s.pkFilter.AddString(stringValue)
s.currentStat.PkFilter.AddString(stringValue)
default:
log.Warn("failed to update bloomfilter", zap.Any("PK type", pk.Type()))
log.Error("failed to update bloomfilter", zap.Any("PK type", pk.Type()))
panic("failed to update bloomfilter")
}
}
}
func (s *Segment) InitCurrentStat() {
if s.currentStat == nil {
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
}
}
}
// check if PK exists is current
func (s *Segment) isPKExist(pk primaryKey) bool {
if s.currentStat != nil && s.currentStat.PkExist(pk) {
return true
}
// for sealed, if one of the stats shows it exist, then we have to check it
for _, historyStat := range s.historyStats {
if historyStat.PkExist(pk) {
return true
}
}
return false
}
//-------------------------------------------------------------------------------------- interfaces for growing segment
func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
/*

View File

@ -628,6 +628,7 @@ func (loader *segmentLoader) loadSegmentBloomFilter(ctx context.Context, segment
return nil
}
startTs := time.Now()
values, err := loader.cm.MultiRead(ctx, binlogPaths)
if err != nil {
return err
@ -642,22 +643,17 @@ func (loader *segmentLoader) loadSegmentBloomFilter(ctx context.Context, segment
log.Warn("failed to deserialize stats", zap.Error(err))
return err
}
// just one BF, just use it
if len(stats) == 1 && stats[0].BF != nil {
segment.pkFilter = stats[0].BF
return nil
}
// legacy merge
var size uint
for _, stat := range stats {
if stat.BF == nil {
log.Warn("stat log with nil bloom filter", zap.Int64("segmentID", segment.segmentID), zap.Any("stat", stat))
continue
}
err = segment.pkFilter.Merge(stat.BF)
if err != nil {
return err
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
size += stat.BF.Cap()
segment.historyStats = append(segment.historyStats, pkStat)
}
log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Int64("segment", segment.segmentID), zap.Uint("size", size))
return nil
}

View File

@ -1024,21 +1024,22 @@ func TestUpdateBloomFilter(t *testing.T) {
defaultCollectionID,
defaultDMLChannel,
defaultSegmentVersion,
segmentTypeSealed)
segmentTypeGrowing)
assert.NoError(t, err)
seg, err := replica.getSegmentByID(defaultSegmentID, segmentTypeSealed)
seg, err := replica.getSegmentByID(defaultSegmentID, segmentTypeGrowing)
assert.Nil(t, err)
pkValues := []int64{1, 2}
pkValues := []int64{1, 3}
pks := make([]primaryKey, len(pkValues))
for index, v := range pkValues {
pks[index] = newInt64PrimaryKey(v)
}
seg.updateBloomFilter(pks)
buf := make([]byte, 8)
for _, v := range pkValues {
common.Endian.PutUint64(buf, uint64(v))
assert.True(t, seg.pkFilter.Test(buf))
assert.True(t, seg.isPKExist(storage.NewInt64PrimaryKey(v)))
}
assert.False(t, seg.isPKExist(storage.NewInt64PrimaryKey(0)))
assert.False(t, seg.isPKExist(storage.NewInt64PrimaryKey(2)))
assert.False(t, seg.isPKExist(storage.NewInt64PrimaryKey(4)))
})
t.Run("test string pk", func(t *testing.T) {
replica, err := genSimpleReplica()
@ -1048,19 +1049,22 @@ func TestUpdateBloomFilter(t *testing.T) {
defaultCollectionID,
defaultDMLChannel,
defaultSegmentVersion,
segmentTypeSealed)
segmentTypeGrowing)
assert.NoError(t, err)
seg, err := replica.getSegmentByID(defaultSegmentID, segmentTypeSealed)
seg, err := replica.getSegmentByID(defaultSegmentID, segmentTypeGrowing)
assert.Nil(t, err)
pkValues := []string{"test1", "test2"}
pkValues := []string{"test1", "test3"}
pks := make([]primaryKey, len(pkValues))
for index, v := range pkValues {
pks[index] = newVarCharPrimaryKey(v)
}
seg.updateBloomFilter(pks)
for _, v := range pkValues {
assert.True(t, seg.pkFilter.TestString(v))
assert.True(t, seg.isPKExist(storage.NewVarCharPrimaryKey(v)))
}
assert.False(t, seg.isPKExist(storage.NewVarCharPrimaryKey("test0")))
assert.False(t, seg.isPKExist(storage.NewVarCharPrimaryKey("test2")))
assert.False(t, seg.isPKExist(storage.NewVarCharPrimaryKey("test4")))
})
}

View File

@ -434,7 +434,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
// stats fields
if field.GetIsPrimaryKey() {
statsWriter := &StatsWriter{}
err = statsWriter.generatePrimaryKeyStats(field.FieldID, field.DataType, singleData)
err = statsWriter.GeneratePrimaryKeyStats(field.FieldID, field.DataType, singleData)
if err != nil {
return nil, nil, err
}

View File

@ -0,0 +1,108 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"errors"
"fmt"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
)
// pkStatistics contains pk field statistic information
type PkStatistics struct {
PkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
MaxPK PrimaryKey // maximal pk value, same above
}
// update set pk min/max value if input value is beyond former range.
func (st *PkStatistics) UpdateMinMax(pk PrimaryKey) error {
if st == nil {
return errors.New("nil pk statistics")
}
if st.MinPK == nil {
st.MinPK = pk
} else if st.MinPK.GT(pk) {
st.MinPK = pk
}
if st.MaxPK == nil {
st.MaxPK = pk
} else if st.MaxPK.LT(pk) {
st.MaxPK = pk
}
return nil
}
func (st *PkStatistics) UpdatePKRange(ids FieldData) error {
switch pks := ids.(type) {
case *Int64FieldData:
buf := make([]byte, 8)
for _, pk := range pks.Data {
id := NewInt64PrimaryKey(pk)
err := st.UpdateMinMax(id)
if err != nil {
return err
}
common.Endian.PutUint64(buf, uint64(pk))
st.PkFilter.Add(buf)
}
case *StringFieldData:
for _, pk := range pks.Data {
id := NewVarCharPrimaryKey(pk)
err := st.UpdateMinMax(id)
if err != nil {
return err
}
st.PkFilter.AddString(pk)
}
default:
return fmt.Errorf("invalid data type for primary key: %T", ids)
}
return nil
}
func (st *PkStatistics) PkExist(pk PrimaryKey) bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return false
}
// check pk range first, ugly but key it for now
if st.MinPK.GT(pk) || st.MaxPK.LT(pk) {
return false
}
// if in range, check bloom filter
switch pk.Type() {
case schemapb.DataType_Int64:
buf := make([]byte, 8)
int64Pk := pk.(*Int64PrimaryKey)
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
return st.PkFilter.Test(buf)
case schemapb.DataType_VarChar:
varCharPk := pk.(*VarCharPrimaryKey)
return st.PkFilter.TestString(varCharPk.Value)
default:
//TODO::
}
// no idea, just make it as false positive
return true
}

View File

@ -26,8 +26,8 @@ import (
const (
// TODO silverxia maybe need set from config
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
BloomFilterSize uint = 100000
MaxBloomFalsePositive float64 = 0.005
)
// PrimaryKeyStats contains statistics data for pk column
@ -109,8 +109,8 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error {
}
}
stats.BF = bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive)
if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil {
stats.BF = &bloom.BloomFilter{}
err = stats.BF.UnmarshalJSON(*bfMessage)
if err != nil {
return err
@ -145,14 +145,14 @@ func (sw *StatsWriter) GetBuffer() []byte {
return sw.buffer
}
// generatePrimaryKeyStats writes Int64Stats from @msgs with @fieldID to @buffer
func (sw *StatsWriter) generatePrimaryKeyStats(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
// GeneratePrimaryKeyStats writes Int64Stats from @msgs with @fieldID to @buffer
func (sw *StatsWriter) GeneratePrimaryKeyStats(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
stats := &PrimaryKeyStats{
FieldID: fieldID,
PkType: int64(pkType),
}
stats.BF = bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive)
stats.BF = bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive)
switch pkType {
case schemapb.DataType_Int64:
data := msgs.(*Int64FieldData).Data

View File

@ -18,6 +18,7 @@ package storage
import (
"encoding/json"
"fmt"
"testing"
"github.com/bits-and-blooms/bloom/v3"
@ -32,7 +33,7 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &StatsWriter{}
err := sw.generatePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data)
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data)
assert.NoError(t, err)
b := sw.GetBuffer()
@ -57,16 +58,45 @@ func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
msgs := &Int64FieldData{
Data: []int64{},
}
err = sw.generatePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
assert.Nil(t, err)
}
func TestStatsWriter_BF(t *testing.T) {
value := make([]int64, 1000000)
for i := 0; i < 1000000; i++ {
value[i] = int64(i)
}
data := &Int64FieldData{
Data: value,
}
fmt.Println(data.RowNum())
sw := &StatsWriter{}
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, data)
assert.NoError(t, err)
stats := &PrimaryKeyStats{}
stats.UnmarshalJSON(sw.buffer)
buf := make([]byte, 8)
for i := 0; i < 1000000; i++ {
common.Endian.PutUint64(buf, uint64(i))
assert.True(t, stats.BF.Test(buf))
}
common.Endian.PutUint64(buf, uint64(1000001))
assert.False(t, stats.BF.Test(buf))
assert.True(t, stats.MinPk.EQ(NewInt64PrimaryKey(0)))
assert.True(t, stats.MaxPk.EQ(NewInt64PrimaryKey(999999)))
}
func TestStatsWriter_VarCharPrimaryKey(t *testing.T) {
data := &StringFieldData{
Data: []string{"bc", "ac", "abd", "cd", "milvus"},
}
sw := &StatsWriter{}
err := sw.generatePrimaryKeyStats(common.RowIDField, schemapb.DataType_VarChar, data)
err := sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_VarChar, data)
assert.NoError(t, err)
b := sw.GetBuffer()
@ -85,7 +115,7 @@ func TestStatsWriter_VarCharPrimaryKey(t *testing.T) {
msgs := &Int64FieldData{
Data: []int64{},
}
err = sw.generatePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
err = sw.GeneratePrimaryKeyStats(common.RowIDField, schemapb.DataType_Int64, msgs)
assert.Nil(t, err)
}
@ -98,7 +128,7 @@ func TestStatsWriter_UpgradePrimaryKey(t *testing.T) {
FieldID: common.RowIDField,
Min: 1,
Max: 9,
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
BF: bloom.NewWithEstimates(100000, 0.05),
}
b := make([]byte, 8)