diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e8b89000cf..bb36534a4d 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -302,17 +302,10 @@ func (m *meta) UpdateFlushSegmentsInfo( } } clonedSegment.Binlogs = currBinlogs - // statlogs - currStatsLogs := clonedSegment.GetStatslogs() - for _, tStatsLogs := range statslogs { - fieldStatsLog := getFieldBinlogs(tStatsLogs.GetFieldID(), currStatsLogs) - if fieldStatsLog == nil { - currStatsLogs = append(currStatsLogs, tStatsLogs) - } else { - fieldStatsLog.Binlogs = append(fieldStatsLog.Binlogs, tStatsLogs.Binlogs...) - } + // statlogs, overwrite latest segment stats log + if len(statslogs) > 0 { + clonedSegment.Statslogs = statslogs } - clonedSegment.Statslogs = currStatsLogs // deltalogs currDeltaLogs := clonedSegment.GetDeltalogs() for _, tDeltaLogs := range deltalogs { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 78b4b8f7da..21f9a782b2 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/blang/semver/v4" "github.com/minio/minio-go/v7" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -413,7 +414,8 @@ var getCheckBucketFn = func(cli *minio.Client) func() error { } func (s *Server) initServiceDiscovery() error { - sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole) + r := semver.MustParseRange(">=2.1.2") + sessions, rev, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r) if err != nil { log.Warn("DataCoord failed to init service discovery", zap.Error(err)) return err @@ -432,7 +434,7 @@ func (s *Server) initServiceDiscovery() error { s.cluster.Startup(s.ctx, datanodes) // TODO implement rewatch logic - s.dnEventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil) + s.dnEventCh = s.session.WatchServicesWithVersionRange(typeutil.DataNodeRole, r, rev+1, nil) //icSessions, icRevision, err := s.session.GetSessions(typeutil.IndexCoordRole) //if err != nil { diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 4a1dc3a753..a1574e4331 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -51,7 +51,7 @@ type uploader interface { // // errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress. // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever. - upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error) + upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, segStats []byte, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error) } type binlogIO struct { @@ -110,6 +110,7 @@ func (b *binlogIO) upload( ctx context.Context, segID, partID UniqueID, iDatas []*InsertData, + segStats []byte, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error) { @@ -131,7 +132,7 @@ func (b *binlogIO) upload( continue } - kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) + kv, inpaths, err := b.genInsertBlobs(iData, partID, segID, meta) if err != nil { log.Warn("generate insert blobs wrong", zap.Int64("collectionID", meta.GetID()), @@ -153,16 +154,25 @@ func (b *binlogIO) upload( } insertField2Path[fID] = tmpBinlog } + } - for fID, path := range statspaths { - tmpBinlog, ok := statsField2Path[fID] - if !ok { - tmpBinlog = path - } else { - tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) - } - statsField2Path[fID] = tmpBinlog - } + pkID := getPKID(meta) + if pkID == common.InvalidFieldID { + log.Error("get invalid field id when finding pk", zap.Int64("collectionID", meta.GetID()), zap.Any("fields", meta.GetSchema().GetFields())) + return nil, errors.New("invalid pk id") + } + logID, err := b.allocID() + if err != nil { + return nil, err + } + k := JoinIDPath(meta.GetID(), partID, segID, pkID, logID) + key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k) + fileLen := len(segStats) + + kvs[key] = segStats + statsField2Path[pkID] = &datapb.FieldBinlog{ + FieldID: pkID, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}}, } for _, path := range insertField2Path { @@ -195,7 +205,7 @@ func (b *binlogIO) upload( }) } - var err = errStart + err = errStart for err != nil { select { case <-ctx.Done(): @@ -237,25 +247,24 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI } // genInsertBlobs returns kvs, insert-paths, stats-paths -func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { +func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, error) { inCodec := storage.NewInsertCodec(meta) - inlogs, statslogs, err := inCodec.Serialize(partID, segID, data) + inlogs, _, err := inCodec.Serialize(partID, segID, data) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var ( - kvs = make(map[string][]byte, len(inlogs)+len(statslogs)) - inpaths = make(map[UniqueID]*datapb.FieldBinlog) - statspaths = make(map[UniqueID]*datapb.FieldBinlog) + kvs = make(map[string][]byte, len(inlogs)+1) + inpaths = make(map[UniqueID]*datapb.FieldBinlog) ) notifyGenIdx := make(chan struct{}) defer close(notifyGenIdx) - generator, err := b.idxGenerator(len(inlogs)+len(statslogs), notifyGenIdx) + generator, err := b.idxGenerator(len(inlogs)+1, notifyGenIdx) if err != nil { - return nil, nil, nil, err + return nil, nil, err } for _, blob := range inlogs { @@ -274,24 +283,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta } } - for _, blob := range statslogs { - // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt - fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) - - k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator) - key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k) - - value := blob.GetValue() - fileLen := len(value) - - kvs[key] = value - statspaths[fID] = &datapb.FieldBinlog{ - FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}}, - } - } - - return kvs, inpaths, statspaths, nil + return kvs, inpaths, nil } func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, error) { diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 9b64e8fc10..6c63e78474 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -53,7 +52,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { Tss: []uint64{666666}, } - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(p.inPaths)) assert.Equal(t, 1, len(p.statsPaths)) @@ -61,18 +60,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs())) assert.NotNil(t, p.deltaInfo) - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, dData, meta) + p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData, iData}, []byte{}, dData, meta) assert.NoError(t, err) assert.Equal(t, 12, len(p.inPaths)) assert.Equal(t, 1, len(p.statsPaths)) assert.Equal(t, 2, len(p.inPaths[0].GetBinlogs())) - assert.Equal(t, 2, len(p.statsPaths[0].GetBinlogs())) + assert.Equal(t, 1, len(p.statsPaths[0].GetBinlogs())) assert.NotNil(t, p.deltaInfo) ctx, cancel := context.WithCancel(context.Background()) cancel() - p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) + p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.EqualError(t, err, errUploadToBlobStorage.Error()) assert.Nil(t, p) }) @@ -86,17 +85,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { } iData := genEmptyInsertData() - p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.NoError(t, err) assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) + assert.NotEmpty(t, p.statsPaths) assert.Empty(t, p.deltaInfo) iData = &InsertData{Data: make(map[int64]storage.FieldData)} - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.NoError(t, err) assert.Empty(t, p.inPaths) - assert.Empty(t, p.statsPaths) + assert.NotEmpty(t, p.statsPaths) assert.Empty(t, p.deltaInfo) iData = genInsertData() @@ -105,7 +104,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { Tss: []uint64{1}, RowCount: 1, } - p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) + p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.Error(t, err) assert.Empty(t, p) @@ -119,9 +118,23 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { RowCount: 1, } ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond) - p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) + p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta) assert.Error(t, err) assert.Empty(t, p) + + alloc.isvalid = false + p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + + alloc.isvalid = true + for _, field := range meta.GetSchema().GetFields() { + field.IsPrimaryKey = false + } + p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, []byte{}, dData, meta) + assert.Error(t, err) + assert.Empty(t, p) + cancel() }) @@ -254,60 +267,55 @@ func TestBinlogIOInnerMethods(t *testing.T) { tests := []struct { pkType schemapb.DataType description string + expectError bool }{ - {schemapb.DataType_Int64, "int64PrimaryField"}, - {schemapb.DataType_VarChar, "varCharPrimaryField"}, + {schemapb.DataType_Int64, "int64PrimaryField", false}, + {schemapb.DataType_VarChar, "varCharPrimaryField", false}, } for _, test := range tests { t.Run(test.description, func(t *testing.T) { meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType) - helper, err := typeutil.CreateSchemaHelper(meta.Schema) - assert.NoError(t, err) - primaryKeyFieldSchema, err := helper.GetPrimaryKeyField() - assert.NoError(t, err) - primaryKeyFieldID := primaryKeyFieldSchema.GetFieldID() - kvs, pin, pstats, err := b.genInsertBlobs(genInsertData(), 10, 1, meta) + kvs, pin, err := b.genInsertBlobs(genInsertData(), 10, 1, meta) + if test.expectError { + assert.Error(t, err) + return + } assert.NoError(t, err) - assert.Equal(t, 1, len(pstats)) assert.Equal(t, 12, len(pin)) - assert.Equal(t, 13, len(kvs)) + assert.Equal(t, 12, len(kvs)) log.Debug("test paths", zap.Any("kvs no.", len(kvs)), - zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()), - zap.String("stats paths field0", pstats[primaryKeyFieldID].GetBinlogs()[0].GetLogPath())) + zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath())) }) } }) t.Run("Test genInsertBlobs error", func(t *testing.T) { - kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil) + kvs, pin, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil) assert.Error(t, err) assert.Empty(t, kvs) assert.Empty(t, pin) - assert.Empty(t, pstats) f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) - kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta) + kvs, pin, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta) assert.Error(t, err) assert.Empty(t, kvs) assert.Empty(t, pin) - assert.Empty(t, pstats) errAlloc := NewAllocatorFactory() errAlloc.errAllocBatch = true bin := &binlogIO{cm, errAlloc} - kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta) + kvs, pin, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta) assert.Error(t, err) assert.Empty(t, kvs) assert.Empty(t, pin) - assert.Empty(t, pstats) }) t.Run("Test idxGenerator", func(t *testing.T) { diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index f5d1a0c439..cae51b3562 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -128,6 +129,27 @@ func (t *compactionTask) getChannelName() string { return t.plan.GetChannel() } +func (t *compactionTask) getPlanTargetEntryNumber() int64 { + if t.plan == nil { + // if plan empty return default size + return int64(bloomFilterSize) + } + var result int64 + for _, info := range t.plan.GetSegmentBinlogs() { + for _, fieldLog := range info.GetFieldBinlogs() { + for _, binlog := range fieldLog.GetBinlogs() { + result += binlog.GetEntriesNum() + } + } + } + + // prevent bloom filter too small + if result == 0 { + return int64(bloomFilterSize) + } + return result +} + func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) { mergeStart := time.Now() dCodec := storage.NewDeleteCodec() @@ -186,7 +208,7 @@ func nano2Milli(nano time.Duration) float64 { return float64(nano) / float64(time.Millisecond) } -func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) { +func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, []byte, int64, error) { mergeStart := time.Now() var ( @@ -196,6 +218,11 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam expired int64 // the number of expired entity err error + // statslog generation + segment *Segment // empty segment used for bf generation + pkID UniqueID + pkType schemapb.DataType + iDatas = make([]*InsertData, 0) fID2Type = make(map[UniqueID]schemapb.DataType) fID2Content = make(map[UniqueID][]interface{}) @@ -209,16 +236,27 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam return false } + // + targetRowCount := t.getPlanTargetEntryNumber() + log.Debug("merge estimate target row count", zap.Int64("row count", targetRowCount)) + segment = &Segment{ + pkFilter: bloom.NewWithEstimates(uint(targetRowCount), maxBloomFalsePositive), + } + // get dim for _, fs := range schema.GetFields() { fID2Type[fs.GetFieldID()] = fs.GetDataType() + if fs.GetIsPrimaryKey() { + pkID = fs.GetFieldID() + pkType = fs.GetDataType() + } if fs.GetDataType() == schemapb.DataType_FloatVector || fs.GetDataType() == schemapb.DataType_BinaryVector { for _, t := range fs.GetTypeParams() { if t.Key == "dim" { if dim, err = strconv.Atoi(t.Value); err != nil { log.Warn("strconv wrong on get dim", zap.Error(err)) - return nil, 0, err + return nil, nil, 0, err } break } @@ -234,7 +272,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam v, ok := vInter.(*storage.Value) if !ok { log.Warn("transfer interface to Value wrong") - return nil, 0, errors.New("unexpected error") + return nil, nil, 0, errors.New("unexpected error") } if isDeletedValue(v) { @@ -251,7 +289,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam row, ok := v.Value.(map[UniqueID]interface{}) if !ok { log.Warn("transfer interface to map wrong") - return nil, 0, errors.New("unexpected error") + return nil, nil, 0, errors.New("unexpected error") } for fID, vInter := range row { @@ -278,7 +316,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam tp, ok := fID2Type[fID] if !ok { log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) - return nil, 0, errors.New("Unexpected error") + return nil, nil, 0, errors.New("Unexpected error") } for i := 0; i < numBinlogs; i++ { @@ -294,17 +332,30 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam if err != nil { log.Warn("transfer interface to FieldData wrong", zap.Error(err)) - return nil, 0, err + return nil, nil, 0, err + } + if fID == pkID { + err = segment.updatePKRange(fData) + if err != nil { + log.Warn("update pk range failed", zap.Error(err)) + return nil, nil, 0, err + } } iDatas[i].Data[fID] = fData } + } + // marshal segment statslog + segStats, err := segment.getSegmentStatslog(pkID, pkType) + if err != nil { + log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err)) + return nil, nil, 0, err } log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) - return iDatas, numRows, nil + return iDatas, segStats, numRows, nil } func (t *compactionTask) compact() (*datapb.CompactionResult, error) { @@ -471,14 +522,14 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return nil, err } - iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime()) + iDatas, segStats, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime()) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return nil, err } uploadStart := time.Now() - segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta) + segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, segStats, deltaBuf.delData, meta) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return nil, err diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 69d9092b24..8968735d2e 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -24,6 +24,7 @@ import ( "time" memkv "github.com/milvus-io/milvus/internal/kv/mem" + "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -264,11 +265,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{} - idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) assert.Equal(t, 1, len(idata)) assert.NotEmpty(t, idata[0].Data) + assert.NotEmpty(t, segStats) }) t.Run("Merge without expiration2", func(t *testing.T) { Params.CommonCfg.EntityExpirationTTL = 0 @@ -291,11 +293,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { dm := map[interface{}]Timestamp{} ct := &compactionTask{} - idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) assert.NoError(t, err) assert.Equal(t, int64(2), numOfRow) assert.Equal(t, 2, len(idata)) assert.NotEmpty(t, idata[0].Data) + assert.NotEmpty(t, segStats) }) t.Run("Merge with expiration", func(t *testing.T) { @@ -315,10 +318,63 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{} - idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) + idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) assert.NoError(t, err) assert.Equal(t, int64(1), numOfRow) assert.Equal(t, 1, len(idata)) + assert.NotEmpty(t, segStats) + }) + + t.Run("Merge with meta error", func(t *testing.T) { + Params.CommonCfg.EntityExpirationTTL = 0 + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) + + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) + + iitr, err := storage.NewInsertBinlogIterator(iblobs, 106, schemapb.DataType_Int64) + require.NoError(t, err) + + mitr := storage.NewMergeIterator([]iterator{iitr}) + + dm := map[interface{}]Timestamp{ + 1: 10000, + } + + ct := &compactionTask{} + _, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: "dim", Value: "64"}, + }}, + }}, ct.GetCurrentTime()) + assert.Error(t, err) + }) + + t.Run("Merge with meta type param error", func(t *testing.T) { + Params.CommonCfg.EntityExpirationTTL = 0 + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) + + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) + + iitr, err := storage.NewInsertBinlogIterator(iblobs, 106, schemapb.DataType_Int64) + require.NoError(t, err) + + mitr := storage.NewMergeIterator([]iterator{iitr}) + + dm := map[interface{}]Timestamp{ + 1: 10000, + } + + ct := &compactionTask{} + _, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: "dim", Value: "dim"}, + }}, + }}, ct.GetCurrentTime()) + assert.Error(t, err) }) }) @@ -503,7 +559,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 1, } - cpaths, err := mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta) + cpaths, err := mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta) require.NoError(t, err) require.Equal(t, 12, len(cpaths.inPaths)) segBinlogs := []*datapb.CompactionSegmentBinlogs{ @@ -553,7 +609,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { err = cm.RemoveWithPrefix("/") require.NoError(t, err) - cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, deleteAllData, meta) + cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, deleteAllData, meta) require.NoError(t, err) plan.PlanID++ @@ -568,7 +624,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { // Compact empty segment err = cm.RemoveWithPrefix("/") require.NoError(t, err) - cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta) + cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta) require.NoError(t, err) plan.PlanID = 999876 segmentBinlogsWithEmptySegment := []*datapb.CompactionSegmentBinlogs{ @@ -585,7 +641,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { // Deltas in timetravel range err = cm.RemoveWithPrefix("/") require.NoError(t, err) - cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta) + cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta) require.NoError(t, err) plan.PlanID++ @@ -601,7 +657,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { // Timeout err = cm.RemoveWithPrefix("/") require.NoError(t, err) - cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, dData, meta) + cpaths, err = mockbIO.upload(context.TODO(), c.segID, c.parID, []*InsertData{iData}, []byte{}, dData, meta) require.NoError(t, err) plan.PlanID++ @@ -678,11 +734,11 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 1, } - cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, dData1, meta) + cpaths1, err := mockbIO.upload(context.TODO(), c.segID1, c.parID, []*InsertData{iData1}, []byte{}, dData1, meta) require.NoError(t, err) require.Equal(t, 12, len(cpaths1.inPaths)) - cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, dData2, meta) + cpaths2, err := mockbIO.upload(context.TODO(), c.segID2, c.parID, []*InsertData{iData2}, []byte{}, dData2, meta) require.NoError(t, err) require.Equal(t, 12, len(cpaths2.inPaths)) @@ -810,11 +866,11 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 0, } - cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta) + cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, []byte{}, dData1, meta) require.NoError(t, err) require.Equal(t, 12, len(cpaths1.inPaths)) - cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta) + cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, []byte{}, dData2, meta) require.NoError(t, err) require.Equal(t, 12, len(cpaths2.inPaths)) @@ -862,7 +918,7 @@ type mockFlushManager struct { var _ flushManager = (*mockFlushManager)(nil) -func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { +func (mfm *mockFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { if mfm.returnError { return fmt.Errorf("mock error") } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 5f99a08431..78837084df 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -384,8 +384,16 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { zap.Bool("dropped", task.dropped), zap.Any("pos", endPositions[0]), ) + + segStats, err := ibNode.replica.getSegmentStatslog(task.segmentID) + if err != nil { + log.Error("failed to get segment stats log", zap.Int64("segmentID", task.segmentID), zap.Error(err)) + panic(err) + } + err = retry.Do(ibNode.ctx, func() error { return ibNode.flushManager.flushBufferData(task.buffer, + segStats, task.segmentID, task.flushed, task.dropped, diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index aeee5dd5cb..de690f7ef7 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -41,7 +41,7 @@ import ( // flushManager defines a flush manager signature type flushManager interface { // notify flush manager insert buffer data - flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error + flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error // notify flush manager del buffer data flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error // injectFlush injects compaction or other blocking task before flush sync @@ -335,7 +335,7 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush // flushBufferData notifies flush manager insert buffer data. // This method will be retired on errors. Final errors will be propagated upstream and logged. -func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, +func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []byte, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error { tr := timerecord.NewTimeRecorder("flushDuration") // empty flush @@ -360,12 +360,13 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni // encode data and convert output data inCodec := storage.NewInsertCodec(meta) - binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer) + binLogs, _, err := inCodec.Serialize(partID, segmentID, data.buffer) if err != nil { return err } - start, _, err := m.allocIDBatch(uint32(len(binLogs))) + // binlogs + 1 statslog + start, _, err := m.allocIDBatch(uint32(len(binLogs) + 1)) if err != nil { return err } @@ -400,27 +401,22 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni field2Stats := make(map[UniqueID]*datapb.Binlog) // write stats binlog - for _, blob := range statsBinlogs { - fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - if err != nil { - log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) - return err - } - logidx := field2Logidx[fieldID] + pkID := getPKID(meta) + if pkID == common.InvalidFieldID { + return fmt.Errorf("failed to get pk id for segment %d", segmentID) + } - // no error raise if alloc=false - k := JoinIDPath(collID, partID, segmentID, fieldID, logidx) - - key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) - kvs[key] = blob.Value - field2Stats[fieldID] = &datapb.Binlog{ - EntriesNum: 0, - TimestampFrom: 0, //TODO - TimestampTo: 0, //TODO, - LogPath: key, - LogSize: int64(len(blob.Value)), - } + logidx := start + int64(len(binLogs)) + k := JoinIDPath(collID, partID, segmentID, pkID, logidx) + key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) + kvs[key] = segStats + field2Stats[pkID] = &datapb.Binlog{ + EntriesNum: 0, + TimestampFrom: 0, //TODO + TimestampTo: 0, //TODO, + LogPath: key, + LogSize: int64(len(segStats)), } m.updateSegmentCheckPoint(segmentID) diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 17f5ebdd7b..b88bba7dae 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -166,7 +166,7 @@ func TestRendezvousFlushManager(t *testing.T) { m.flushDelData(nil, 1, &internalpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() @@ -213,7 +213,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { m.flushDelData(nil, 1, &internalpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() @@ -228,10 +228,10 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { rand.Read(id) id2 := make([]byte, 10) rand.Read(id2) - m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 2, true, false, &internalpb.MsgPosition{ MsgID: id, }) - m.flushBufferData(nil, 3, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 3, true, false, &internalpb.MsgPosition{ MsgID: id2, }) @@ -256,7 +256,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { finish.Add(1) rand.Read(id) - m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 2, false, false, &internalpb.MsgPosition{ MsgID: id, }) ti = newTaskInjection(1, func(pack *segmentFlushPack) { @@ -340,7 +340,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { mut.RUnlock() for i := 0; i < size/2; i++ { - m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) } @@ -350,7 +350,7 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { mut.RUnlock() for i := size / 2; i < size; i++ { - m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) } @@ -384,7 +384,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { }) halfMsgID := []byte{1, 1, 1} - m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{ MsgID: halfMsgID, }) @@ -397,7 +397,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { target := make(map[int64]struct{}) for i := 1; i < 11; i++ { target[int64(i)] = struct{}{} - m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{ MsgID: []byte{1}, }) m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ @@ -436,7 +436,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { }) halfMsgID := []byte{1, 1, 1} - m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, -1, true, false, &internalpb.MsgPosition{ MsgID: halfMsgID, }) @@ -457,7 +457,7 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) { }) for i := 1; i < 11; i++ { - m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, int64(i), true, false, &internalpb.MsgPosition{ MsgID: []byte{1}, }) m.flushDelData(nil, int64(i), &internalpb.MsgPosition{ @@ -504,7 +504,7 @@ func TestRendezvousFlushManager_close(t *testing.T) { m.flushDelData(nil, 1, &internalpb.MsgPosition{ MsgID: ids[i], }) - m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{ + m.flushBufferData(nil, nil, 1, true, false, &internalpb.MsgPosition{ MsgID: ids[i], }) wg.Done() diff --git a/internal/datanode/meta_util.go b/internal/datanode/meta_util.go index 67358ed01a..4db45c1d95 100644 --- a/internal/datanode/meta_util.go +++ b/internal/datanode/meta_util.go @@ -16,7 +16,11 @@ package datanode -import "github.com/milvus-io/milvus/internal/proto/datapb" +import ( + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" +) // reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { @@ -61,3 +65,14 @@ func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { } vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) } + +// getPKID returns the primary key field id from collection meta. +func getPKID(meta *etcdpb.CollectionMeta) UniqueID { + for _, field := range meta.GetSchema().GetFields() { + if field.GetIsPrimaryKey() { + return field.GetFieldID() + } + } + + return common.InvalidFieldID +} diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 653beca873..cb0e7e612f 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "encoding/json" "fmt" "sync" "sync/atomic" @@ -79,6 +80,7 @@ type Replica interface { refreshFlushedSegStatistics(segID UniqueID, numRows int64) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) segmentFlushed(segID UniqueID) + getSegmentStatslog(segID UniqueID) ([]byte, error) } // Segment is the data structure of segments in data node replica. @@ -168,6 +170,18 @@ func (s *Segment) updatePKRange(ids storage.FieldData) error { return nil } +func (s *Segment) getSegmentStatslog(pkID UniqueID, pkType schemapb.DataType) ([]byte, error) { + pks := storage.PrimaryKeyStats{ + FieldID: pkID, + PkType: int64(pkType), + MaxPk: s.maxPK, + MinPk: s.minPK, + BF: s.pkFilter, + } + + return json.Marshal(pks) +} + var _ Replica = &SegmentReplica{} func newReplica(ctx context.Context, rc types.RootCoord, cm storage.ChunkManager, collID UniqueID) (*SegmentReplica, error) { @@ -864,3 +878,37 @@ func (replica *SegmentReplica) listNotFlushedSegmentIDs() []UniqueID { return segIDs } + +// getSegmentStatslog returns the segment statslog for the provided segment id. +func (replica *SegmentReplica) getSegmentStatslog(segID UniqueID) ([]byte, error) { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + schema, err := replica.getCollectionSchema(replica.collectionID, 0) + if err != nil { + return nil, err + } + + var pkID UniqueID + var pkType schemapb.DataType + for _, field := range schema.GetFields() { + if field.GetIsPrimaryKey() { + pkID = field.GetFieldID() + pkType = field.GetDataType() + } + } + + if seg, ok := replica.newSegments[segID]; ok { + return seg.getSegmentStatslog(pkID, pkType) + } + + if seg, ok := replica.normalSegments[segID]; ok { + return seg.getSegmentStatslog(pkID, pkType) + } + + if seg, ok := replica.flushedSegments[segID]; ok { + return seg.getSegmentStatslog(pkID, pkType) + } + + return nil, fmt.Errorf("segment not found: %d", segID) +} diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index d6660a7fff..3071487e19 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -22,10 +22,12 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -793,6 +795,52 @@ func TestSegmentReplica_UpdatePKRange(t *testing.T) { } } +func TestSegment_getSegmentStatslog(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + cases := make([][]int64, 0, 100) + for i := 0; i < 100; i++ { + tc := make([]int64, 0, 10) + for j := 0; j < 100; j++ { + tc = append(tc, rand.Int63()) + } + cases = append(cases, tc) + } + buf := make([]byte, 8) + for _, tc := range cases { + seg := &Segment{ + pkFilter: bloom.NewWithEstimates(100000, 0.005), + } + + seg.updatePKRange(&storage.Int64FieldData{ + Data: tc, + }) + + statBytes, err := seg.getSegmentStatslog(1, schemapb.DataType_Int64) + assert.NoError(t, err) + + pks := storage.PrimaryKeyStats{} + err = json.Unmarshal(statBytes, &pks) + require.NoError(t, err) + + assert.Equal(t, int64(1), pks.FieldID) + assert.Equal(t, int64(schemapb.DataType_Int64), pks.PkType) + + for _, v := range tc { + pk := newInt64PrimaryKey(v) + assert.True(t, pks.MinPk.LE(pk)) + assert.True(t, pks.MaxPk.GE(pk)) + + common.Endian.PutUint64(buf, uint64(v)) + assert.True(t, seg.pkFilter.Test(buf)) + } + } + + pks := &storage.PrimaryKeyStats{} + _, err := json.Marshal(pks) + assert.NoError(t, err) +} + func TestReplica_UpdatePKRange(t *testing.T) { rc := &RootCoordFactory{ pkType: schemapb.DataType_Int64, @@ -844,3 +892,102 @@ func TestReplica_UpdatePKRange(t *testing.T) { } } + +// SegmentReplicaSuite setup test suite for SegmentReplica +type SegmentReplicaSuite struct { + suite.Suite + sr *SegmentReplica + + collID UniqueID + partID UniqueID + vchanName string + cm *storage.LocalChunkManager +} + +func (s *SegmentReplicaSuite) SetupSuite() { + rc := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + s.collID = 1 + s.cm = storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir)) + var err error + s.sr, err = newReplica(context.Background(), rc, s.cm, s.collID) + s.Require().NoError(err) +} + +func (s *SegmentReplicaSuite) TearDownSuite() { + s.cm.RemoveWithPrefix("") + +} + +func (s *SegmentReplicaSuite) SetupTest() { + var err error + err = s.sr.addNewSegment(1, s.collID, s.partID, s.vchanName, &internalpb.MsgPosition{}, nil) + s.Require().NoError(err) + err = s.sr.addNormalSegment(2, s.collID, s.partID, s.vchanName, 10, nil, nil, 0) + s.Require().NoError(err) + err = s.sr.addFlushedSegment(3, s.collID, s.partID, s.vchanName, 10, nil, 0) + s.Require().NoError(err) +} + +func (s *SegmentReplicaSuite) TearDownTest() { + s.sr.removeSegments(1, 2, 3) +} + +func (s *SegmentReplicaSuite) TestGetSegmentStatslog() { + bs, err := s.sr.getSegmentStatslog(1) + s.NoError(err) + + segment, ok := s.getSegmentByID(1) + s.Require().True(ok) + expected, err := segment.getSegmentStatslog(106, schemapb.DataType_Int64) + s.Require().NoError(err) + s.Equal(expected, bs) + + bs, err = s.sr.getSegmentStatslog(2) + s.NoError(err) + + segment, ok = s.getSegmentByID(2) + s.Require().True(ok) + expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64) + s.Require().NoError(err) + s.Equal(expected, bs) + + bs, err = s.sr.getSegmentStatslog(3) + s.NoError(err) + + segment, ok = s.getSegmentByID(3) + s.Require().True(ok) + expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64) + s.Require().NoError(err) + s.Equal(expected, bs) + + _, err = s.sr.getSegmentStatslog(4) + s.Error(err) +} + +func (s *SegmentReplicaSuite) getSegmentByID(id UniqueID) (*Segment, bool) { + s.sr.segMu.RLock() + defer s.sr.segMu.RUnlock() + + seg, ok := s.sr.newSegments[id] + if ok { + return seg, true + } + + seg, ok = s.sr.normalSegments[id] + if ok { + return seg, true + } + + seg, ok = s.sr.flushedSegments[id] + if ok { + return seg, true + } + + return nil, false +} + +func TestSegmentReplicaSuite(t *testing.T) { + suite.Run(t, new(SegmentReplicaSuite)) +} diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 6322d51353..17450584da 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -600,8 +600,15 @@ func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment, binlogPath stats, err := storage.DeserializeStats(blobs) if err != nil { + log.Warn("failed to deserialize stats", zap.Error(err)) return err } + // just one BF, just use it + if len(stats) == 1 && stats[0].BF != nil { + segment.pkFilter = stats[0].BF + return nil + } + // legacy merge for _, stat := range stats { if stat.BF == nil { log.Warn("stat log with nil bloom filter", zap.Int64("segmentID", segment.segmentID), zap.Any("stat", stat))