Verify and adjust segment # of rows before saving and passing SegmentInfo (#21200)

issue: #18120

/kind enhancement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
pull/21963/head
Ten Thousand Leaves 2023-02-03 14:19:52 +08:00 committed by GitHub
parent 161725a661
commit 423c1c65a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 227 additions and 23 deletions

View File

@ -30,6 +30,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -547,6 +549,26 @@ func TestGetSegmentInfo(t *testing.T) {
segInfo := &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Flushed,
NumOfRows: 100,
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
},
},
},
},
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
@ -555,6 +577,9 @@ func TestGetSegmentInfo(t *testing.T) {
SegmentIDs: []int64{0},
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.Equal(t, 1, len(resp.GetInfos()))
// Check that # of rows is corrected from 100 to 60.
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
@ -2318,7 +2343,41 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
seg1.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
},
},
},
}
seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed)
seg2.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
},
},
},
}
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
@ -2362,8 +2421,11 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.GetChannels()))
assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegmentIds()))
//assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments())
assert.ElementsMatch(t, []int64{0, 1}, resp.GetChannels()[0].GetFlushedSegmentIds())
assert.EqualValues(t, 10, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.EqualValues(t, 2, len(resp.GetBinlogs()))
// Row count corrected from 100 + 100 -> 100 + 60.
assert.EqualValues(t, 160, resp.GetBinlogs()[0].GetNumOfRows()+resp.GetBinlogs()[1].GetNumOfRows())
})
t.Run("test get recovery of unflushed segments ", func(t *testing.T) {
@ -2386,7 +2448,41 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing)
seg1.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
},
},
},
}
seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing)
seg2.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
},
},
},
}
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
@ -2884,7 +2980,7 @@ type rootCoordSegFlushComplete struct {
flag bool
}
//SegmentFlushCompleted, override default behavior
// SegmentFlushCompleted, override default behavior
func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
if rc.flag {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil

View File

@ -23,25 +23,24 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/errorutil"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/errorutil"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// checks whether server in Healthy State
@ -335,6 +334,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes
}
// GetSegmentInfo returns segment info requested, status, row count, etc included
// Called by: QueryCoord, DataNode, IndexCoord, Proxy.
func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
resp := &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
@ -359,20 +359,22 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
}
child := s.meta.GetCompactionTo(id)
clonedInfo := info.Clone()
if child != nil {
info = info.Clone()
info.Deltalogs = append(info.Deltalogs, child.GetDeltalogs()...)
info.DmlPosition = child.GetDmlPosition()
clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, child.GetDeltalogs()...)
clonedInfo.DmlPosition = child.GetDmlPosition()
}
infos = append(infos, info.SegmentInfo)
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
infos = append(infos, clonedInfo.SegmentInfo)
} else {
info = s.meta.GetSegment(id)
if info == nil {
resp.Status.Reason = msgSegmentNotFound(id)
return resp, nil
}
infos = append(infos, info.SegmentInfo)
clonedInfo := info.Clone()
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
infos = append(infos, clonedInfo.SegmentInfo)
}
vchannel := info.InsertChannel
if _, ok := channelCPs[vchannel]; vchannel != "" && !ok {
@ -591,7 +593,8 @@ func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentSta
return resp, nil
}
// GetRecoveryInfo get recovery info for segment
// GetRecoveryInfo get recovery info for segment.
// Called by: QueryCoord.
func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
collectionID := req.GetCollectionID()
partitionID := req.GetPartitionID()
@ -678,7 +681,15 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs)
}
if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()),
zap.Int64("segment bin log row count (correct)", newCount))
segmentsNumOfRows[id] = newCount
} else {
segmentsNumOfRows[id] = segment.NumOfRows
}
statsBinlogs := segment.GetStatslogs()
field2StatsBinlog := make(map[UniqueID][]*datapb.Binlog)

View File

@ -23,10 +23,9 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -229,6 +230,25 @@ func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo)
return kc.MetaKv.MultiSave(kvs)
}
// LoadFromSegmentPath loads segment info from persistent storage by given segment path.
// # TESTING ONLY #
func (kc *Catalog) LoadFromSegmentPath(colID, partID, segID typeutil.UniqueID) (*datapb.SegmentInfo, error) {
v, err := kc.MetaKv.Load(buildSegmentPath(colID, partID, segID))
if err != nil {
log.Error("(testing only) failed to load segment info by segment path")
return nil, err
}
segInfo := &datapb.SegmentInfo{}
err = proto.Unmarshal([]byte(v), segInfo)
if err != nil {
log.Error("(testing only) failed to unmarshall segment info")
return nil, err
}
return segInfo, nil
}
func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error {
if len(newSegments) == 0 {
return nil
@ -318,7 +338,8 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
for _, s := range segments {
noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(s)
// `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`.
segmentutil.ReCalcRowCount(s, noBinlogsSegment)
// for compacted segments
if noBinlogsSegment.State == commonpb.SegmentState_Dropped {
hasBinlogkeys, err := kc.hasBinlogPrefix(s)
@ -408,6 +429,8 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d
for _, s := range segments {
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
noBinlogsSegment, _, _, _ := CloneSegmentWithExcludeBinlogs(s)
// `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`.
segmentutil.ReCalcRowCount(s, noBinlogsSegment)
segBytes, err := proto.Marshal(noBinlogsSegment)
if err != nil {
return fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err)
@ -666,6 +689,8 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu
func buildSegmentAndBinlogsKvs(segment *datapb.SegmentInfo) (map[string]string, error) {
noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(segment)
// `segment` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `segment`.
segmentutil.ReCalcRowCount(segment, noBinlogsSegment)
// save binlogs separately
kvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs)

View File

@ -24,8 +24,8 @@ type MockedTxnKV struct {
multiSave func(kvs map[string]string) error
save func(key, value string) error
loadWithPrefix func(key string) ([]string, []string, error)
multiRemove func(keys []string) error
load func(key string) (string, error)
multiRemove func(keys []string) error
walkWithPrefix func(prefix string, paginationSize int, fn func([]byte, []byte) error) error
remove func(key string) error
}
@ -311,15 +311,26 @@ func Test_AddSegments(t *testing.T) {
t.Run("save successfully", func(t *testing.T) {
txn := &MockedTxnKV{}
var savedKvs map[string]string
savedKvs := make(map[string]string)
txn.multiSave = func(kvs map[string]string) error {
savedKvs = kvs
return nil
}
txn.load = func(key string) (string, error) {
if v, ok := savedKvs[key]; ok {
return v, nil
}
return "", errors.New("key not found")
}
catalog := NewCatalog(txn, rootPath, "")
err := catalog.AddSegment(context.TODO(), segment1)
assert.Nil(t, err)
adjustedSeg, err := catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID)
assert.NoError(t, err)
// Check that num of rows is corrected from 100 to 5.
assert.Equal(t, int64(100), segment1.GetNumOfRows())
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
_, ok := savedKvs[k4]
assert.False(t, ok)
@ -387,6 +398,12 @@ func Test_AlterSegments(t *testing.T) {
opGroupCount++
return nil
}
txn.load = func(key string) (string, error) {
if v, ok := savedKvs[key]; ok {
return v, nil
}
return "", errors.New("key not found")
}
catalog := NewCatalog(txn, rootPath, "")
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{})
@ -420,6 +437,12 @@ func Test_AlterSegments(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 255+3, len(savedKvs))
assert.Equal(t, 5, opGroupCount)
adjustedSeg, err := catalog.LoadFromSegmentPath(segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID)
assert.NoError(t, err)
// Check that num of rows is corrected from 100 to 1275.
assert.Equal(t, int64(100), segmentXL.GetNumOfRows())
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
})
}
@ -453,10 +476,15 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {
maps.Copy(savedKvs, kvs)
return nil
}
txn.loadWithPrefix = func(key string) ([]string, []string, error) {
return []string{}, []string{}, nil
}
txn.load = func(key string) (string, error) {
if v, ok := savedKvs[key]; ok {
return v, nil
}
return "", errors.New("key not found")
}
catalog := NewCatalog(txn, rootPath, "")
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1)
@ -465,6 +493,18 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {
assert.Equal(t, 8, len(savedKvs))
verifySavedKvsForDroppedSegment(t, savedKvs)
verifySavedKvsForSegment(t, savedKvs)
adjustedSeg, err := catalog.LoadFromSegmentPath(droppedSegment.CollectionID, droppedSegment.PartitionID, droppedSegment.ID)
assert.NoError(t, err)
// Check that num of rows is corrected from 100 to 5.
assert.Equal(t, int64(100), droppedSegment.GetNumOfRows())
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
adjustedSeg, err = catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID)
assert.NoError(t, err)
// Check that num of rows is corrected from 100 to 5.
assert.Equal(t, int64(100), droppedSegment.GetNumOfRows())
assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows())
})
}

View File

@ -0,0 +1,33 @@
package segmentutil
import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
// ReCalcRowCount re-calculates number of rows of `oldSeg` based on its bin log count, and correct its value in its
// cloned copy, which is `newSeg`.
// Note that `segCloned` should be a copied version of `seg`.
func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) {
// `segment` is not mutated but only cloned above and is safe to be referred here.
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", seg.GetID()),
zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()),
zap.Int64("segment bin log row count (correct)", newCount))
// Update the corrected row count.
segCloned.NumOfRows = newCount
}
}
// CalcRowCountFromBinLog calculates # of rows of a segment from bin logs
func CalcRowCountFromBinLog(seg *datapb.SegmentInfo) int64 {
var rowCt int64
if len(seg.GetBinlogs()) > 0 {
for _, ct := range seg.GetBinlogs()[0].GetBinlogs() {
rowCt += ct.GetEntriesNum()
}
}
return rowCt
}