diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 6164bde1a0..4abf77d785 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -175,7 +175,7 @@ func (c *ChannelMeta) initSegmentBloomFilter(s *Segment) error { return err } - s.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive) + s.pkStat.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive) return nil } @@ -308,12 +308,12 @@ func (c *ChannelMeta) initPKBloomFilter(ctx context.Context, s *Segment, statsBi } for _, stat := range stats { // use first BF to merge - if s.pkFilter == nil { - s.pkFilter = stat.BF + if s.pkStat.pkFilter == nil { + s.pkStat.pkFilter = stat.BF } else { // for compatibility, statslog before 2.1.2 uses separated stats log which needs to be merged // assuming all legacy BF has same attributes. - err = s.pkFilter.Merge(stat.BF) + err = s.pkStat.pkFilter.Merge(stat.BF) if err != nil { return err } diff --git a/internal/datanode/channel_meta_test.go b/internal/datanode/channel_meta_test.go index 3f81fac005..e0138f4122 100644 --- a/internal/datanode/channel_meta_test.go +++ b/internal/datanode/channel_meta_test.go @@ -782,15 +782,15 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) { pk := newInt64PrimaryKey(c) - assert.Equal(t, true, segNew.minPK.LE(pk)) - assert.Equal(t, true, segNew.maxPK.GE(pk)) + assert.Equal(t, true, segNew.pkStat.minPK.LE(pk)) + assert.Equal(t, true, segNew.pkStat.maxPK.GE(pk)) - assert.Equal(t, true, segNormal.minPK.LE(pk)) - assert.Equal(t, true, segNormal.maxPK.GE(pk)) + assert.Equal(t, true, segNormal.pkStat.minPK.LE(pk)) + assert.Equal(t, true, segNormal.pkStat.maxPK.GE(pk)) common.Endian.PutUint64(buf, uint64(c)) - assert.True(t, segNew.pkFilter.Test(buf)) - assert.True(t, segNormal.pkFilter.Test(buf)) + assert.True(t, segNew.pkStat.pkFilter.Test(buf)) + assert.True(t, segNormal.pkStat.pkFilter.Test(buf)) } @@ -911,35 +911,47 @@ func (s *ChannelMetaSuite) TestHasSegment() { } func (s *ChannelMetaSuite) TestGetSegmentStatslog() { + s.channel.updateSegmentPKRange(1, &storage.Int64FieldData{Data: []int64{1}}) bs, err := s.channel.getSegmentStatslog(1) s.NoError(err) segment, ok := s.getSegmentByID(1) s.Require().True(ok) + err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{1}}) + s.Require().NoError(err) expected, err := segment.getSegmentStatslog(106, schemapb.DataType_Int64) s.Require().NoError(err) s.Equal(expected, bs) + s.channel.updateSegmentPKRange(2, &storage.Int64FieldData{Data: []int64{2}}) bs, err = s.channel.getSegmentStatslog(2) s.NoError(err) segment, ok = s.getSegmentByID(2) s.Require().True(ok) + err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{2}}) + s.Require().NoError(err) expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64) s.Require().NoError(err) s.Equal(expected, bs) + s.channel.updateSegmentPKRange(3, &storage.Int64FieldData{Data: []int64{3}}) bs, err = s.channel.getSegmentStatslog(3) s.NoError(err) segment, ok = s.getSegmentByID(3) s.Require().True(ok) + err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{3}}) + s.Require().NoError(err) expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64) s.Require().NoError(err) s.Equal(expected, bs) _, err = s.channel.getSegmentStatslog(4) s.Error(err) + + _, err = s.channel.getSegmentStatslog(1) + s.ErrorIs(err, errSegmentStatsNotChanged) } func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) { diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 0395bb5762..a6c3c3521d 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -397,17 +397,20 @@ func (t *compactionTask) merge( // marshal segment statslog segStats, err := segment.getSegmentStatslog(pkID, pkType) - if err != nil { + if err != nil && !errors.Is(err, errSegmentStatsNotChanged) { log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err)) return nil, nil, nil, 0, err } - uploadStatsStart := time.Now() - statsPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, segStats, meta) - if err != nil { - return nil, nil, nil, 0, err + var statsPaths []*datapb.FieldBinlog + if len(segStats) > 0 { + uploadStatsStart := time.Now() + statsPaths, err = t.uploadStatsLog(ctxTimeout, targetSegID, partID, segStats, meta) + if err != nil { + return nil, nil, nil, 0, err + } + uploadStatsTimeCost += time.Since(uploadStatsStart) } - uploadStatsTimeCost += time.Since(uploadStatsStart) log.Debug("merge end", zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs), diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 7bd11b0136..1c7575e2f1 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -379,7 +379,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(0), numOfRow) assert.Equal(t, 0, len(inPaths)) - assert.Equal(t, 1, len(statsPaths)) + assert.Equal(t, 0, len(statsPaths)) }) t.Run("Merge with meta error", func(t *testing.T) { diff --git a/internal/datanode/errors.go b/internal/datanode/errors.go index 259356ee28..048e5abaff 100644 --- a/internal/datanode/errors.go +++ b/internal/datanode/errors.go @@ -21,6 +21,11 @@ import ( "fmt" ) +var ( + // errSegmentStatsNotChanged error stands for segment stats not changed. + errSegmentStatsNotChanged = errors.New("segment stats not changed") +) + func msgDataNodeIsUnhealthy(nodeID UniqueID) string { return fmt.Sprintf("DataNode %d is not ready", nodeID) } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index dde049a6b7..82b6ebb546 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -310,10 +310,10 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [ case schemapb.DataType_Int64: int64Pk := pk.(*int64PrimaryKey) common.Endian.PutUint64(buf, uint64(int64Pk.Value)) - exist = segment.pkFilter.Test(buf) + exist = segment.pkStat.pkFilter.Test(buf) case schemapb.DataType_VarChar: varCharPk := pk.(*varCharPrimaryKey) - exist = segment.pkFilter.TestString(varCharPk.Value) + exist = segment.pkStat.pkFilter.TestString(varCharPk.Value) default: //TODO:: } diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index a8eb05d590..c7d234fe33 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -101,9 +101,9 @@ func genMockChannel(segIDs []int64, pks []primaryKey, chanName string) *ChannelM } seg.setType(segTypes[i]) if i < 3 { - seg.pkFilter = filter0 + seg.pkStat.pkFilter = filter0 } else { - seg.pkFilter = filter1 + seg.pkStat.pkFilter = filter1 } channel.segments[segIDs[i]] = &seg } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index fc4bc9a89b..3cbd1ba788 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -403,7 +403,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { ) segStats, err := ibNode.channel.getSegmentStatslog(task.segmentID) - if err != nil { + if err != nil && !errors.Is(err, errSegmentStatsNotChanged) { log.Error("failed to get segment stats log", zap.Int64("segmentID", task.segmentID), zap.Error(err)) panic(err) } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index b70263368b..2b1bc2db57 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -405,21 +405,24 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by field2Stats := make(map[UniqueID]*datapb.Binlog) // write stats binlog - pkID := getPKID(meta) - if pkID == common.InvalidFieldID { - return fmt.Errorf("failed to get pk id for segment %d", segmentID) - } + // if segStats content is not nil, means segment stats changed + if len(segStats) > 0 { + pkID := getPKID(meta) + if pkID == common.InvalidFieldID { + return fmt.Errorf("failed to get pk id for segment %d", segmentID) + } - logidx := start + int64(len(binLogs)) - k := metautil.JoinIDPath(collID, partID, segmentID, pkID, logidx) - key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) - kvs[key] = segStats - field2Stats[pkID] = &datapb.Binlog{ - EntriesNum: 0, - TimestampFrom: 0, //TODO - TimestampTo: 0, //TODO, - LogPath: key, - LogSize: int64(len(segStats)), + logidx := start + int64(len(binLogs)) + k := metautil.JoinIDPath(collID, partID, segmentID, pkID, logidx) + key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k) + kvs[key] = segStats + field2Stats[pkID] = &datapb.Binlog{ + EntriesNum: 0, + TimestampFrom: 0, //TODO + TimestampTo: 0, //TODO, + LogPath: key, + LogSize: int64(len(segStats)), + } } m.handleInsertTask(segmentID, &flushBufferInsertTask{ diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index 5ab70949a6..b69828fae6 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -18,6 +18,8 @@ package datanode import ( "encoding/json" + "errors" + "fmt" "sync/atomic" "github.com/bits-and-blooms/bloom/v3" @@ -42,12 +44,92 @@ type Segment struct { memorySize int64 compactedTo UniqueID + pkStat pkStatistics + startPos *internalpb.MsgPosition // TODO readonly endPos *internalpb.MsgPosition +} - pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment - minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment - maxPK primaryKey // maximal pk value, same above +// pkStatistics contains pk field statistic information +type pkStatistics struct { + statsChanged bool // statistic changed + pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment + minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment + maxPK primaryKey // maximal pk value, same above +} + +// update set pk min/max value if input value is beyond former range. +func (st *pkStatistics) update(pk primaryKey) error { + if st == nil { + return errors.New("nil pk statistics") + } + if st.minPK == nil { + st.minPK = pk + } else if st.minPK.GT(pk) { + st.minPK = pk + } + + if st.maxPK == nil { + st.maxPK = pk + } else if st.maxPK.LT(pk) { + st.maxPK = pk + } + + return nil +} + +func (st *pkStatistics) updatePKRange(ids storage.FieldData) error { + switch pks := ids.(type) { + case *storage.Int64FieldData: + buf := make([]byte, 8) + for _, pk := range pks.Data { + id := storage.NewInt64PrimaryKey(pk) + err := st.update(id) + if err != nil { + return err + } + common.Endian.PutUint64(buf, uint64(pk)) + st.pkFilter.Add(buf) + } + case *storage.StringFieldData: + for _, pk := range pks.Data { + id := storage.NewVarCharPrimaryKey(pk) + err := st.update(id) + if err != nil { + return err + } + st.pkFilter.AddString(pk) + } + default: + return fmt.Errorf("invalid data type for primary key: %T", ids) + } + + // mark statistic updated + st.statsChanged = true + + return nil +} + +// getStatslog return marshaled statslog content if there is any change since last call. +// statslog is marshaled as json. +func (st *pkStatistics) getStatslog(segmentID, pkID UniqueID, pkType schemapb.DataType) ([]byte, error) { + if !st.statsChanged { + return nil, fmt.Errorf("%w segment %d", errSegmentStatsNotChanged, segmentID) + } + + pks := storage.PrimaryKeyStats{ + FieldID: pkID, + PkType: int64(pkType), + MaxPk: st.maxPK, + MinPk: st.minPK, + BF: st.pkFilter, + } + + bs, err := json.Marshal(pks) + if err == nil { + st.statsChanged = false + } + return bs, err } type addSegmentReq struct { @@ -61,19 +143,7 @@ type addSegmentReq struct { } func (s *Segment) updatePk(pk primaryKey) error { - if s.minPK == nil { - s.minPK = pk - } else if s.minPK.GT(pk) { - s.minPK = pk - } - - if s.maxPK == nil { - s.maxPK = pk - } else if s.maxPK.LT(pk) { - s.maxPK = pk - } - - return nil + return s.pkStat.update(pk) } func (s *Segment) isValid() bool { @@ -93,46 +163,22 @@ func (s *Segment) setType(t datapb.SegmentType) { } func (s *Segment) updatePKRange(ids storage.FieldData) error { - switch pks := ids.(type) { - case *storage.Int64FieldData: - buf := make([]byte, 8) - for _, pk := range pks.Data { - id := storage.NewInt64PrimaryKey(pk) - err := s.updatePk(id) - if err != nil { - return err - } - common.Endian.PutUint64(buf, uint64(pk)) - s.pkFilter.Add(buf) - } - case *storage.StringFieldData: - for _, pk := range pks.Data { - id := storage.NewVarCharPrimaryKey(pk) - err := s.updatePk(id) - if err != nil { - return err - } - s.pkFilter.AddString(pk) - } - default: - //TODO:: + log := log.With(zap.Int64("collectionID", s.collectionID), + zap.Int64("partitionID", s.partitionID), + zap.Int64("segmentID", s.segmentID), + ) + + err := s.pkStat.updatePKRange(ids) + if err != nil { + log.Warn("failed to updatePKRange", zap.Error(err)) } log.Info("update pk range", - zap.Int64("collectionID", s.collectionID), zap.Int64("partitionID", s.partitionID), zap.Int64("segmentID", s.segmentID), - zap.Int64("num_rows", s.numRows), zap.Any("minPK", s.minPK), zap.Any("maxPK", s.maxPK)) + zap.Int64("num_rows", s.numRows), zap.Any("minPK", s.pkStat.minPK), zap.Any("maxPK", s.pkStat.maxPK)) return nil } func (s *Segment) getSegmentStatslog(pkID UniqueID, pkType schemapb.DataType) ([]byte, error) { - pks := storage.PrimaryKeyStats{ - FieldID: pkID, - PkType: int64(pkType), - MaxPk: s.maxPK, - MinPk: s.minPK, - BF: s.pkFilter, - } - - return json.Marshal(pks) + return s.pkStat.getStatslog(s.segmentID, pkID, pkType) } diff --git a/internal/datanode/segment_test.go b/internal/datanode/segment_test.go index 43b07f0425..35239c78dd 100644 --- a/internal/datanode/segment_test.go +++ b/internal/datanode/segment_test.go @@ -33,7 +33,9 @@ import ( func TestSegment_UpdatePKRange(t *testing.T) { seg := &Segment{ - pkFilter: bloom.NewWithEstimates(100000, 0.005), + pkStat: pkStatistics{ + pkFilter: bloom.NewWithEstimates(100000, 0.005), + }, } cases := make([]int64, 0, 100) @@ -48,11 +50,11 @@ func TestSegment_UpdatePKRange(t *testing.T) { pk := newInt64PrimaryKey(c) - assert.Equal(t, true, seg.minPK.LE(pk)) - assert.Equal(t, true, seg.maxPK.GE(pk)) + assert.Equal(t, true, seg.pkStat.minPK.LE(pk)) + assert.Equal(t, true, seg.pkStat.maxPK.GE(pk)) common.Endian.PutUint64(buf, uint64(c)) - assert.True(t, seg.pkFilter.Test(buf)) + assert.True(t, seg.pkStat.pkFilter.Test(buf)) } } @@ -70,8 +72,9 @@ func TestSegment_getSegmentStatslog(t *testing.T) { buf := make([]byte, 8) for _, tc := range cases { seg := &Segment{ - pkFilter: bloom.NewWithEstimates(100000, 0.005), - } + pkStat: pkStatistics{ + pkFilter: bloom.NewWithEstimates(100000, 0.005), + }} seg.updatePKRange(&storage.Int64FieldData{ Data: tc, @@ -93,7 +96,7 @@ func TestSegment_getSegmentStatslog(t *testing.T) { assert.True(t, pks.MaxPk.GE(pk)) common.Endian.PutUint64(buf, uint64(v)) - assert.True(t, seg.pkFilter.Test(buf)) + assert.True(t, seg.pkStat.pkFilter.Test(buf)) } }