mirror of https://github.com/milvus-io/milvus.git
Keep segment statslog if no insert applied (#19910)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/19729/head
parent
96cf74d53a
commit
0095869d5b
|
@ -175,7 +175,7 @@ func (c *ChannelMeta) initSegmentBloomFilter(s *Segment) error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive)
|
||||
s.pkStat.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -308,12 +308,12 @@ func (c *ChannelMeta) initPKBloomFilter(ctx context.Context, s *Segment, statsBi
|
|||
}
|
||||
for _, stat := range stats {
|
||||
// use first BF to merge
|
||||
if s.pkFilter == nil {
|
||||
s.pkFilter = stat.BF
|
||||
if s.pkStat.pkFilter == nil {
|
||||
s.pkStat.pkFilter = stat.BF
|
||||
} else {
|
||||
// for compatibility, statslog before 2.1.2 uses separated stats log which needs to be merged
|
||||
// assuming all legacy BF has same attributes.
|
||||
err = s.pkFilter.Merge(stat.BF)
|
||||
err = s.pkStat.pkFilter.Merge(stat.BF)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -782,15 +782,15 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) {
|
|||
|
||||
pk := newInt64PrimaryKey(c)
|
||||
|
||||
assert.Equal(t, true, segNew.minPK.LE(pk))
|
||||
assert.Equal(t, true, segNew.maxPK.GE(pk))
|
||||
assert.Equal(t, true, segNew.pkStat.minPK.LE(pk))
|
||||
assert.Equal(t, true, segNew.pkStat.maxPK.GE(pk))
|
||||
|
||||
assert.Equal(t, true, segNormal.minPK.LE(pk))
|
||||
assert.Equal(t, true, segNormal.maxPK.GE(pk))
|
||||
assert.Equal(t, true, segNormal.pkStat.minPK.LE(pk))
|
||||
assert.Equal(t, true, segNormal.pkStat.maxPK.GE(pk))
|
||||
|
||||
common.Endian.PutUint64(buf, uint64(c))
|
||||
assert.True(t, segNew.pkFilter.Test(buf))
|
||||
assert.True(t, segNormal.pkFilter.Test(buf))
|
||||
assert.True(t, segNew.pkStat.pkFilter.Test(buf))
|
||||
assert.True(t, segNormal.pkStat.pkFilter.Test(buf))
|
||||
|
||||
}
|
||||
|
||||
|
@ -911,35 +911,47 @@ func (s *ChannelMetaSuite) TestHasSegment() {
|
|||
}
|
||||
|
||||
func (s *ChannelMetaSuite) TestGetSegmentStatslog() {
|
||||
s.channel.updateSegmentPKRange(1, &storage.Int64FieldData{Data: []int64{1}})
|
||||
bs, err := s.channel.getSegmentStatslog(1)
|
||||
s.NoError(err)
|
||||
|
||||
segment, ok := s.getSegmentByID(1)
|
||||
s.Require().True(ok)
|
||||
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{1}})
|
||||
s.Require().NoError(err)
|
||||
expected, err := segment.getSegmentStatslog(106, schemapb.DataType_Int64)
|
||||
s.Require().NoError(err)
|
||||
s.Equal(expected, bs)
|
||||
|
||||
s.channel.updateSegmentPKRange(2, &storage.Int64FieldData{Data: []int64{2}})
|
||||
bs, err = s.channel.getSegmentStatslog(2)
|
||||
s.NoError(err)
|
||||
|
||||
segment, ok = s.getSegmentByID(2)
|
||||
s.Require().True(ok)
|
||||
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{2}})
|
||||
s.Require().NoError(err)
|
||||
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
|
||||
s.Require().NoError(err)
|
||||
s.Equal(expected, bs)
|
||||
|
||||
s.channel.updateSegmentPKRange(3, &storage.Int64FieldData{Data: []int64{3}})
|
||||
bs, err = s.channel.getSegmentStatslog(3)
|
||||
s.NoError(err)
|
||||
|
||||
segment, ok = s.getSegmentByID(3)
|
||||
s.Require().True(ok)
|
||||
err = segment.updatePKRange(&storage.Int64FieldData{Data: []int64{3}})
|
||||
s.Require().NoError(err)
|
||||
expected, err = segment.getSegmentStatslog(106, schemapb.DataType_Int64)
|
||||
s.Require().NoError(err)
|
||||
s.Equal(expected, bs)
|
||||
|
||||
_, err = s.channel.getSegmentStatslog(4)
|
||||
s.Error(err)
|
||||
|
||||
_, err = s.channel.getSegmentStatslog(1)
|
||||
s.ErrorIs(err, errSegmentStatsNotChanged)
|
||||
}
|
||||
|
||||
func (s *ChannelMetaSuite) getSegmentByID(id UniqueID) (*Segment, bool) {
|
||||
|
|
|
@ -397,17 +397,20 @@ func (t *compactionTask) merge(
|
|||
|
||||
// marshal segment statslog
|
||||
segStats, err := segment.getSegmentStatslog(pkID, pkType)
|
||||
if err != nil {
|
||||
if err != nil && !errors.Is(err, errSegmentStatsNotChanged) {
|
||||
log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err))
|
||||
return nil, nil, nil, 0, err
|
||||
}
|
||||
|
||||
uploadStatsStart := time.Now()
|
||||
statsPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, segStats, meta)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, err
|
||||
var statsPaths []*datapb.FieldBinlog
|
||||
if len(segStats) > 0 {
|
||||
uploadStatsStart := time.Now()
|
||||
statsPaths, err = t.uploadStatsLog(ctxTimeout, targetSegID, partID, segStats, meta)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, err
|
||||
}
|
||||
uploadStatsTimeCost += time.Since(uploadStatsStart)
|
||||
}
|
||||
uploadStatsTimeCost += time.Since(uploadStatsStart)
|
||||
|
||||
log.Debug("merge end", zap.Int64("remaining insert numRows", numRows),
|
||||
zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs),
|
||||
|
|
|
@ -379,7 +379,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(0), numOfRow)
|
||||
assert.Equal(t, 0, len(inPaths))
|
||||
assert.Equal(t, 1, len(statsPaths))
|
||||
assert.Equal(t, 0, len(statsPaths))
|
||||
})
|
||||
|
||||
t.Run("Merge with meta error", func(t *testing.T) {
|
||||
|
|
|
@ -21,6 +21,11 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
// errSegmentStatsNotChanged error stands for segment stats not changed.
|
||||
errSegmentStatsNotChanged = errors.New("segment stats not changed")
|
||||
)
|
||||
|
||||
func msgDataNodeIsUnhealthy(nodeID UniqueID) string {
|
||||
return fmt.Sprintf("DataNode %d is not ready", nodeID)
|
||||
}
|
||||
|
|
|
@ -310,10 +310,10 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [
|
|||
case schemapb.DataType_Int64:
|
||||
int64Pk := pk.(*int64PrimaryKey)
|
||||
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
|
||||
exist = segment.pkFilter.Test(buf)
|
||||
exist = segment.pkStat.pkFilter.Test(buf)
|
||||
case schemapb.DataType_VarChar:
|
||||
varCharPk := pk.(*varCharPrimaryKey)
|
||||
exist = segment.pkFilter.TestString(varCharPk.Value)
|
||||
exist = segment.pkStat.pkFilter.TestString(varCharPk.Value)
|
||||
default:
|
||||
//TODO::
|
||||
}
|
||||
|
|
|
@ -101,9 +101,9 @@ func genMockChannel(segIDs []int64, pks []primaryKey, chanName string) *ChannelM
|
|||
}
|
||||
seg.setType(segTypes[i])
|
||||
if i < 3 {
|
||||
seg.pkFilter = filter0
|
||||
seg.pkStat.pkFilter = filter0
|
||||
} else {
|
||||
seg.pkFilter = filter1
|
||||
seg.pkStat.pkFilter = filter1
|
||||
}
|
||||
channel.segments[segIDs[i]] = &seg
|
||||
}
|
||||
|
|
|
@ -403,7 +403,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||
)
|
||||
|
||||
segStats, err := ibNode.channel.getSegmentStatslog(task.segmentID)
|
||||
if err != nil {
|
||||
if err != nil && !errors.Is(err, errSegmentStatsNotChanged) {
|
||||
log.Error("failed to get segment stats log", zap.Int64("segmentID", task.segmentID), zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -405,21 +405,24 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
|
|||
field2Stats := make(map[UniqueID]*datapb.Binlog)
|
||||
// write stats binlog
|
||||
|
||||
pkID := getPKID(meta)
|
||||
if pkID == common.InvalidFieldID {
|
||||
return fmt.Errorf("failed to get pk id for segment %d", segmentID)
|
||||
}
|
||||
// if segStats content is not nil, means segment stats changed
|
||||
if len(segStats) > 0 {
|
||||
pkID := getPKID(meta)
|
||||
if pkID == common.InvalidFieldID {
|
||||
return fmt.Errorf("failed to get pk id for segment %d", segmentID)
|
||||
}
|
||||
|
||||
logidx := start + int64(len(binLogs))
|
||||
k := metautil.JoinIDPath(collID, partID, segmentID, pkID, logidx)
|
||||
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
|
||||
kvs[key] = segStats
|
||||
field2Stats[pkID] = &datapb.Binlog{
|
||||
EntriesNum: 0,
|
||||
TimestampFrom: 0, //TODO
|
||||
TimestampTo: 0, //TODO,
|
||||
LogPath: key,
|
||||
LogSize: int64(len(segStats)),
|
||||
logidx := start + int64(len(binLogs))
|
||||
k := metautil.JoinIDPath(collID, partID, segmentID, pkID, logidx)
|
||||
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
|
||||
kvs[key] = segStats
|
||||
field2Stats[pkID] = &datapb.Binlog{
|
||||
EntriesNum: 0,
|
||||
TimestampFrom: 0, //TODO
|
||||
TimestampTo: 0, //TODO,
|
||||
LogPath: key,
|
||||
LogSize: int64(len(segStats)),
|
||||
}
|
||||
}
|
||||
|
||||
m.handleInsertTask(segmentID, &flushBufferInsertTask{
|
||||
|
|
|
@ -18,6 +18,8 @@ package datanode
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/bits-and-blooms/bloom/v3"
|
||||
|
@ -42,12 +44,92 @@ type Segment struct {
|
|||
memorySize int64
|
||||
compactedTo UniqueID
|
||||
|
||||
pkStat pkStatistics
|
||||
|
||||
startPos *internalpb.MsgPosition // TODO readonly
|
||||
endPos *internalpb.MsgPosition
|
||||
}
|
||||
|
||||
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
|
||||
minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
|
||||
maxPK primaryKey // maximal pk value, same above
|
||||
// pkStatistics contains pk field statistic information
|
||||
type pkStatistics struct {
|
||||
statsChanged bool // statistic changed
|
||||
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
|
||||
minPK primaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
|
||||
maxPK primaryKey // maximal pk value, same above
|
||||
}
|
||||
|
||||
// update set pk min/max value if input value is beyond former range.
|
||||
func (st *pkStatistics) update(pk primaryKey) error {
|
||||
if st == nil {
|
||||
return errors.New("nil pk statistics")
|
||||
}
|
||||
if st.minPK == nil {
|
||||
st.minPK = pk
|
||||
} else if st.minPK.GT(pk) {
|
||||
st.minPK = pk
|
||||
}
|
||||
|
||||
if st.maxPK == nil {
|
||||
st.maxPK = pk
|
||||
} else if st.maxPK.LT(pk) {
|
||||
st.maxPK = pk
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *pkStatistics) updatePKRange(ids storage.FieldData) error {
|
||||
switch pks := ids.(type) {
|
||||
case *storage.Int64FieldData:
|
||||
buf := make([]byte, 8)
|
||||
for _, pk := range pks.Data {
|
||||
id := storage.NewInt64PrimaryKey(pk)
|
||||
err := st.update(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
common.Endian.PutUint64(buf, uint64(pk))
|
||||
st.pkFilter.Add(buf)
|
||||
}
|
||||
case *storage.StringFieldData:
|
||||
for _, pk := range pks.Data {
|
||||
id := storage.NewVarCharPrimaryKey(pk)
|
||||
err := st.update(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
st.pkFilter.AddString(pk)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("invalid data type for primary key: %T", ids)
|
||||
}
|
||||
|
||||
// mark statistic updated
|
||||
st.statsChanged = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getStatslog return marshaled statslog content if there is any change since last call.
|
||||
// statslog is marshaled as json.
|
||||
func (st *pkStatistics) getStatslog(segmentID, pkID UniqueID, pkType schemapb.DataType) ([]byte, error) {
|
||||
if !st.statsChanged {
|
||||
return nil, fmt.Errorf("%w segment %d", errSegmentStatsNotChanged, segmentID)
|
||||
}
|
||||
|
||||
pks := storage.PrimaryKeyStats{
|
||||
FieldID: pkID,
|
||||
PkType: int64(pkType),
|
||||
MaxPk: st.maxPK,
|
||||
MinPk: st.minPK,
|
||||
BF: st.pkFilter,
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(pks)
|
||||
if err == nil {
|
||||
st.statsChanged = false
|
||||
}
|
||||
return bs, err
|
||||
}
|
||||
|
||||
type addSegmentReq struct {
|
||||
|
@ -61,19 +143,7 @@ type addSegmentReq struct {
|
|||
}
|
||||
|
||||
func (s *Segment) updatePk(pk primaryKey) error {
|
||||
if s.minPK == nil {
|
||||
s.minPK = pk
|
||||
} else if s.minPK.GT(pk) {
|
||||
s.minPK = pk
|
||||
}
|
||||
|
||||
if s.maxPK == nil {
|
||||
s.maxPK = pk
|
||||
} else if s.maxPK.LT(pk) {
|
||||
s.maxPK = pk
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.pkStat.update(pk)
|
||||
}
|
||||
|
||||
func (s *Segment) isValid() bool {
|
||||
|
@ -93,46 +163,22 @@ func (s *Segment) setType(t datapb.SegmentType) {
|
|||
}
|
||||
|
||||
func (s *Segment) updatePKRange(ids storage.FieldData) error {
|
||||
switch pks := ids.(type) {
|
||||
case *storage.Int64FieldData:
|
||||
buf := make([]byte, 8)
|
||||
for _, pk := range pks.Data {
|
||||
id := storage.NewInt64PrimaryKey(pk)
|
||||
err := s.updatePk(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
common.Endian.PutUint64(buf, uint64(pk))
|
||||
s.pkFilter.Add(buf)
|
||||
}
|
||||
case *storage.StringFieldData:
|
||||
for _, pk := range pks.Data {
|
||||
id := storage.NewVarCharPrimaryKey(pk)
|
||||
err := s.updatePk(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.pkFilter.AddString(pk)
|
||||
}
|
||||
default:
|
||||
//TODO::
|
||||
log := log.With(zap.Int64("collectionID", s.collectionID),
|
||||
zap.Int64("partitionID", s.partitionID),
|
||||
zap.Int64("segmentID", s.segmentID),
|
||||
)
|
||||
|
||||
err := s.pkStat.updatePKRange(ids)
|
||||
if err != nil {
|
||||
log.Warn("failed to updatePKRange", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Info("update pk range",
|
||||
zap.Int64("collectionID", s.collectionID), zap.Int64("partitionID", s.partitionID), zap.Int64("segmentID", s.segmentID),
|
||||
zap.Int64("num_rows", s.numRows), zap.Any("minPK", s.minPK), zap.Any("maxPK", s.maxPK))
|
||||
zap.Int64("num_rows", s.numRows), zap.Any("minPK", s.pkStat.minPK), zap.Any("maxPK", s.pkStat.maxPK))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Segment) getSegmentStatslog(pkID UniqueID, pkType schemapb.DataType) ([]byte, error) {
|
||||
pks := storage.PrimaryKeyStats{
|
||||
FieldID: pkID,
|
||||
PkType: int64(pkType),
|
||||
MaxPk: s.maxPK,
|
||||
MinPk: s.minPK,
|
||||
BF: s.pkFilter,
|
||||
}
|
||||
|
||||
return json.Marshal(pks)
|
||||
return s.pkStat.getStatslog(s.segmentID, pkID, pkType)
|
||||
}
|
||||
|
|
|
@ -33,7 +33,9 @@ import (
|
|||
|
||||
func TestSegment_UpdatePKRange(t *testing.T) {
|
||||
seg := &Segment{
|
||||
pkFilter: bloom.NewWithEstimates(100000, 0.005),
|
||||
pkStat: pkStatistics{
|
||||
pkFilter: bloom.NewWithEstimates(100000, 0.005),
|
||||
},
|
||||
}
|
||||
|
||||
cases := make([]int64, 0, 100)
|
||||
|
@ -48,11 +50,11 @@ func TestSegment_UpdatePKRange(t *testing.T) {
|
|||
|
||||
pk := newInt64PrimaryKey(c)
|
||||
|
||||
assert.Equal(t, true, seg.minPK.LE(pk))
|
||||
assert.Equal(t, true, seg.maxPK.GE(pk))
|
||||
assert.Equal(t, true, seg.pkStat.minPK.LE(pk))
|
||||
assert.Equal(t, true, seg.pkStat.maxPK.GE(pk))
|
||||
|
||||
common.Endian.PutUint64(buf, uint64(c))
|
||||
assert.True(t, seg.pkFilter.Test(buf))
|
||||
assert.True(t, seg.pkStat.pkFilter.Test(buf))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,8 +72,9 @@ func TestSegment_getSegmentStatslog(t *testing.T) {
|
|||
buf := make([]byte, 8)
|
||||
for _, tc := range cases {
|
||||
seg := &Segment{
|
||||
pkFilter: bloom.NewWithEstimates(100000, 0.005),
|
||||
}
|
||||
pkStat: pkStatistics{
|
||||
pkFilter: bloom.NewWithEstimates(100000, 0.005),
|
||||
}}
|
||||
|
||||
seg.updatePKRange(&storage.Int64FieldData{
|
||||
Data: tc,
|
||||
|
@ -93,7 +96,7 @@ func TestSegment_getSegmentStatslog(t *testing.T) {
|
|||
assert.True(t, pks.MaxPk.GE(pk))
|
||||
|
||||
common.Endian.PutUint64(buf, uint64(v))
|
||||
assert.True(t, seg.pkFilter.Test(buf))
|
||||
assert.True(t, seg.pkStat.pkFilter.Test(buf))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue