Reduce rpc size for GetRecoveryInfoV2 (#27483)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/27865/head
Xiaofan 2023-10-23 21:44:09 +08:00 committed by GitHub
parent 6060dd7ea8
commit 2ea7579dbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 441 additions and 64 deletions

View File

@ -443,7 +443,7 @@ grpc:
serverMaxSendSize: 536870912
serverMaxRecvSize: 536870912
client:
compressionEnabled: false
compressionEnabled: true
dialTimeout: 200
keepAliveTime: 10000
keepAliveTimeout: 20000

View File

@ -491,8 +491,8 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
return channelsWithTimer
}
// GetChannels gets channels info of registered nodes.
func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
// GetAssignedChannels gets channels info of registered nodes.
func (c *ChannelManager) GetAssignedChannels() []*NodeChannelInfo {
c.mu.RLock()
defer c.mu.RUnlock()
@ -501,13 +501,14 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel {
channels := make([]*channel, 0)
for _, nodeChannels := range c.store.GetNodesChannels() {
for _, nodeChannels := range c.store.GetChannels() {
for _, channelInfo := range nodeChannels.Channels {
if collectionID == channelInfo.CollectionID {
channels = append(channels, channelInfo)
}
}
}
log.Info("get channel", zap.Any("collection", collectionID), zap.Any("channel", channels))
return channels
}
@ -899,7 +900,7 @@ func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName st
}
func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID) {
for _, nodeChannel := range c.GetChannels() {
for _, nodeChannel := range c.GetAssignedChannels() {
for _, ch := range nodeChannel.Channels {
if ch.Name == chName {
return true, nodeChannel.NodeID

View File

@ -132,7 +132,7 @@ func (suite *ClusterSuite) TestCreate() {
err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}})
suite.NoError(err)
channels := channelManager.GetChannels()
channels := channelManager.GetAssignedChannels()
suite.EqualValues([]*NodeChannelInfo{{1, []*channel{{Name: "channel1", CollectionID: 1}}}}, channels)
})
@ -181,7 +181,7 @@ func (suite *ClusterSuite) TestCreate() {
suite.EqualValues(1, len(sessions))
suite.EqualValues(2, sessions[0].info.NodeID)
suite.EqualValues(addr, sessions[0].info.Address)
channels := channelManager2.GetChannels()
channels := channelManager2.GetAssignedChannels()
suite.EqualValues(1, len(channels))
suite.EqualValues(2, channels[0].NodeID)
})
@ -253,7 +253,7 @@ func (suite *ClusterSuite) TestRegister() {
suite.NoError(err)
bufferChannels := channelManager.GetBufferChannels()
suite.Empty(bufferChannels.Channels)
nodeChannels := channelManager.GetChannels()
nodeChannels := channelManager.GetAssignedChannels()
suite.EqualValues(1, len(nodeChannels))
suite.EqualValues(1, nodeChannels[0].NodeID)
suite.EqualValues("ch1", nodeChannels[0].Channels[0].Name)
@ -287,7 +287,7 @@ func (suite *ClusterSuite) TestRegister() {
suite.NoError(err)
restartCluster := NewCluster(sessionManager2, channelManager2)
defer restartCluster.Close()
channels := channelManager2.GetChannels()
channels := channelManager2.GetAssignedChannels()
suite.Empty(channels)
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
@ -352,7 +352,7 @@ func (suite *ClusterSuite) TestUnregister() {
err = cluster.UnRegister(nodeInfo1)
suite.NoError(err)
channels := channelManager.GetChannels()
channels := channelManager.GetAssignedChannels()
suite.EqualValues(1, len(channels))
suite.EqualValues(2, channels[0].NodeID)
suite.EqualValues(1, len(channels[0].Channels))
@ -386,7 +386,7 @@ func (suite *ClusterSuite) TestUnregister() {
suite.NoError(err)
err = cluster.UnRegister(nodeInfo)
suite.NoError(err)
channels := channelManager.GetChannels()
channels := channelManager.GetAssignedChannels()
suite.Empty(channels)
channel := channelManager.GetBufferChannels()
suite.NotNil(channel)
@ -433,7 +433,7 @@ func TestWatchIfNeeded(t *testing.T) {
assert.NoError(t, err)
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetChannels()
channels := channelManager.GetAssignedChannels()
assert.EqualValues(t, 1, len(channels))
assert.EqualValues(t, "ch1", channels[0].Channels[0].Name)
})
@ -452,7 +452,7 @@ func TestWatchIfNeeded(t *testing.T) {
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetChannels()
channels := channelManager.GetAssignedChannels()
assert.Empty(t, channels)
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)

View File

@ -4209,6 +4209,7 @@ var globalTestTikv = tikv.SetupLocalTxn()
func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
var err error
paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int()))
paramtable.Get().Save(Params.RocksmqCfg.CompressionTypes.Key, "0,0,0,0,0")
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/segmentutil"
@ -115,7 +116,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
var isUnimplemented bool
err = retry.Do(ctx, func() error {
for _, channelInfo := range s.channelManager.GetChannels() {
for _, channelInfo := range s.channelManager.GetAssignedChannels() {
nodeID := channelInfo.NodeID
channels := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool {
return channel.CollectionID == req.GetCollectionID()
@ -817,15 +818,37 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
rowCount = segment.NumOfRows
}
// save the traffic of sending
binLogs, err := datacoord.CompressBinLog(segment.Binlogs)
if err != nil {
log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
deltaLogs, err := datacoord.CompressBinLog(segment.Deltalogs)
if err != nil {
log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
statLogs, err := datacoord.CompressBinLog(segment.Statslogs)
if err != nil {
log.Warn("failed to compress segment", zap.Int64("segmentID", id), zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
InsertChannel: segment.InsertChannel,
NumOfRows: rowCount,
Binlogs: segment.Binlogs,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
Binlogs: binLogs,
Statslogs: statLogs,
Deltalogs: deltaLogs,
})
}
@ -1223,7 +1246,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
}
channels := make([]string, 0)
for _, channelInfo := range s.channelManager.GetChannels() {
for _, channelInfo := range s.channelManager.GetAssignedChannels() {
filtered := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool {
return channel.CollectionID == req.GetCollectionID()
})

View File

@ -2,7 +2,6 @@ package datacoord
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -346,10 +345,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/binlog/file1",
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
},
{
LogPath: "/binlog/file2",
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
},
},
},
@ -359,10 +358,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/stats_log/file1",
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
},
{
LogPath: "/stats_log/file2",
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
},
},
},
@ -373,7 +372,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
{
TimestampFrom: 0,
TimestampTo: 1,
LogPath: "/stats_log/file1",
LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000),
LogSize: 1,
},
},
@ -418,13 +417,23 @@ func TestGetRecoveryInfoV2(t *testing.T) {
}
resp, err := svr.GetRecoveryInfoV2(context.TODO(), req)
assert.NoError(t, err)
assert.NoError(t, merr.Error(resp.Status))
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.EqualValues(t, 1, len(resp.GetSegments()))
assert.EqualValues(t, 0, resp.GetSegments()[0].GetID())
assert.EqualValues(t, 1, len(resp.GetSegments()[0].GetBinlogs()))
assert.EqualValues(t, 1, resp.GetSegments()[0].GetBinlogs()[0].GetFieldID())
for i, binlog := range resp.GetSegments()[0].GetBinlogs()[0].GetBinlogs() {
assert.Equal(t, fmt.Sprintf("/binlog/file%d", i+1), binlog.GetLogPath())
for _, binlog := range resp.GetSegments()[0].GetBinlogs()[0].GetBinlogs() {
assert.Equal(t, "", binlog.GetLogPath())
assert.Equal(t, int64(801), binlog.GetLogID())
}
for _, binlog := range resp.GetSegments()[0].GetStatslogs()[0].GetBinlogs() {
assert.Equal(t, "", binlog.GetLogPath())
assert.Equal(t, int64(10000), binlog.GetLogID())
}
for _, binlog := range resp.GetSegments()[0].GetDeltalogs()[0].GetBinlogs() {
assert.Equal(t, "", binlog.GetLogPath())
assert.Equal(t, int64(100000), binlog.GetLogID())
}
})
t.Run("with dropped segments", func(t *testing.T) {
@ -516,6 +525,197 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
})
t.Run("with failed compress", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: newTestSchema(),
})
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
svr.meta.AddCollection(&collectionInfo{
ID: 1,
Schema: newTestSchema(),
})
err = svr.meta.UpdateChannelCheckpoint("vchan2", &msgpb.MsgPosition{
ChannelName: "vchan2",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
svr.meta.AddCollection(&collectionInfo{
ID: 2,
Schema: newTestSchema(),
})
err = svr.meta.UpdateChannelCheckpoint("vchan3", &msgpb.MsgPosition{
ChannelName: "vchan3",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
svr.channelManager.AddNode(0)
ch := &channel{
Name: "vchan1",
CollectionID: 0,
}
err = svr.channelManager.Watch(context.TODO(), ch)
assert.NoError(t, err)
ch = &channel{
Name: "vchan2",
CollectionID: 1,
}
err = svr.channelManager.Watch(context.TODO(), ch)
assert.NoError(t, err)
ch = &channel{
Name: "vchan3",
CollectionID: 2,
}
err = svr.channelManager.Watch(context.TODO(), ch)
assert.NoError(t, err)
seg := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed)
binLogPaths := make([]*datapb.Binlog, 1)
// miss one field
path := metautil.JoinIDPath(0, 0, 8, fieldID)
path = path + "/mock"
binLogPaths[0] = &datapb.Binlog{
EntriesNum: 10000,
LogPath: path,
}
seg.Statslogs = append(seg.Statslogs, &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: binLogPaths,
})
binLogPaths2 := make([]*datapb.Binlog, 1)
pathCorrect := metautil.JoinIDPath(0, 0, 8, fieldID, 1)
binLogPaths2[0] = &datapb.Binlog{
EntriesNum: 10000,
LogPath: pathCorrect,
}
seg.Binlogs = append(seg.Binlogs, &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: binLogPaths2,
})
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg))
assert.NoError(t, err)
// make sure collection is indexed
err = svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_1",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
assert.NoError(t, err)
svr.meta.segments.SetSegmentIndex(seg.ID, &model.SegmentIndex{
SegmentID: seg.ID,
CollectionID: 0,
PartitionID: 0,
NumRows: 100,
IndexID: 0,
BuildID: 0,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
})
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,
}
resp, err := svr.GetRecoveryInfoV2(context.TODO(), req)
assert.NoError(t, err)
assert.True(t, resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError)
// test bin log
path = metautil.JoinIDPath(0, 0, 9, fieldID)
path = path + "/mock"
binLogPaths[0] = &datapb.Binlog{
EntriesNum: 10000,
LogPath: path,
}
seg2 := createSegment(9, 1, 0, 100, 40, "vchan2", commonpb.SegmentState_Flushed)
seg2.Binlogs = append(seg2.Binlogs, &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: binLogPaths,
})
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
// make sure collection is indexed
err = svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 1,
FieldID: 2,
IndexID: 1,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
assert.NoError(t, err)
svr.meta.segments.SetSegmentIndex(seg2.ID, &model.SegmentIndex{
SegmentID: seg2.ID,
CollectionID: 1,
PartitionID: 0,
NumRows: 100,
IndexID: 1,
BuildID: 0,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
})
req = &datapb.GetRecoveryInfoRequestV2{
CollectionID: 1,
}
resp, err = svr.GetRecoveryInfoV2(context.TODO(), req)
assert.NoError(t, err)
assert.True(t, resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError)
})
t.Run("with continuous compaction", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

View File

@ -730,18 +730,12 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type
func fillLogPathByLogID(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID,
segmentID typeutil.UniqueID, fieldBinlog *datapb.FieldBinlog,
) error {
) {
for _, binlog := range fieldBinlog.Binlogs {
path, err := buildLogPath(chunkManagerRootPath, binlogType, collectionID, partitionID,
path := buildLogPath(chunkManagerRootPath, binlogType, collectionID, partitionID,
segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID())
if err != nil {
return err
}
binlog.LogPath = path
}
return nil
}
func fillLogIDByLogPath(multiFieldBinlogs ...[]*datapb.FieldBinlog) error {
@ -768,42 +762,85 @@ func fillLogIDByLogPath(multiFieldBinlogs ...[]*datapb.FieldBinlog) error {
return nil
}
// build a binlog path on the storage by metadata
func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, filedID, logID typeutil.UniqueID) (string, error) {
switch binlogType {
case storage.InsertBinlog:
path := metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID)
return path, nil
case storage.DeleteBinlog:
path := metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID)
return path, nil
case storage.StatsBinlog:
path := metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID)
return path, nil
default:
return "", fmt.Errorf("invalid binlog type: %d", binlogType)
func CompressBinLog(fieldBinLogs []*datapb.FieldBinlog) ([]*datapb.FieldBinlog, error) {
compressedFieldBinLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinLog := range fieldBinLogs {
compressedFieldBinLog := &datapb.FieldBinlog{}
compressedFieldBinLog.FieldID = fieldBinLog.FieldID
for _, binlog := range fieldBinLog.Binlogs {
logPath := binlog.LogPath
idx := strings.LastIndex(logPath, "/")
if idx == -1 {
return nil, fmt.Errorf("invailed binlog path: %s", logPath)
}
logPathStr := logPath[(idx + 1):]
logID, err := strconv.ParseInt(logPathStr, 10, 64)
if err != nil {
return nil, err
}
binlog := &datapb.Binlog{
EntriesNum: binlog.EntriesNum,
// remove timestamp since it's not necessary
LogSize: binlog.LogSize,
LogID: logID,
}
compressedFieldBinLog.Binlogs = append(compressedFieldBinLog.Binlogs, binlog)
}
compressedFieldBinLogs = append(compressedFieldBinLogs, compressedFieldBinLog)
}
return compressedFieldBinLogs, nil
}
func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) {
check := func(getSegmentID func(logPath string) typeutil.UniqueID) {
func DecompressBinLog(path string, info *datapb.SegmentInfo) error {
for _, fieldBinLogs := range info.GetBinlogs() {
fillLogPathByLogID(path, storage.InsertBinlog, info.CollectionID, info.PartitionID, info.ID, fieldBinLogs)
}
for _, deltaLogs := range info.GetDeltalogs() {
fillLogPathByLogID(path, storage.DeleteBinlog, info.CollectionID, info.PartitionID, info.ID, deltaLogs)
}
for _, statsLogs := range info.GetStatslogs() {
fillLogPathByLogID(path, storage.StatsBinlog, info.CollectionID, info.PartitionID, info.ID, statsLogs)
}
return nil
}
// build a binlog path on the storage by metadata
func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, filedID, logID typeutil.UniqueID) string {
switch binlogType {
case storage.InsertBinlog:
return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID)
case storage.DeleteBinlog:
return metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID)
case storage.StatsBinlog:
return metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID)
}
// should not happen
log.Panic("invalid binlog type", zap.Any("type", binlogType))
return ""
}
func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) error {
check := func(getSegmentID func(logPath string) typeutil.UniqueID) error {
for _, fieldBinlog := range logs {
for _, binlog := range fieldBinlog.Binlogs {
if segmentID != getSegmentID(binlog.LogPath) {
log.Panic("the segment path doesn't match the segmentID", zap.Int64("segmentID", segmentID), zap.String("path", binlog.LogPath))
return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, path %s", segmentID, binlog.LogPath)
}
}
}
return nil
}
switch binlogType {
case storage.InsertBinlog:
check(metautil.GetSegmentIDFromInsertLogPath)
return check(metautil.GetSegmentIDFromInsertLogPath)
case storage.DeleteBinlog:
check(metautil.GetSegmentIDFromDeltaLogPath)
return check(metautil.GetSegmentIDFromDeltaLogPath)
case storage.StatsBinlog:
check(metautil.GetSegmentIDFromStatsLogPath)
return check(metautil.GetSegmentIDFromStatsLogPath)
default:
log.Panic("invalid binlog type")
return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, type %d", segmentID, binlogType)
}
}
@ -820,9 +857,18 @@ func hasSepcialStatslog(logs *datapb.FieldBinlog) bool {
func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID,
binlogs, deltalogs, statslogs []*datapb.FieldBinlog, ignoreNumberCheck bool,
) (map[string]string, error) {
checkBinlogs(storage.InsertBinlog, segmentID, binlogs)
err := checkBinlogs(storage.InsertBinlog, segmentID, binlogs)
if err != nil {
return nil, err
}
checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs)
if err != nil {
return nil, err
}
checkBinlogs(storage.StatsBinlog, segmentID, statslogs)
if err != nil {
return nil, err
}
// check stats log and bin log size match
// num of stats log may one more than num of binlogs if segment flushed and merged stats log
if !ignoreNumberCheck && len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) {

View File

@ -277,9 +277,9 @@ func Test_AddSegments(t *testing.T) {
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error")).Maybe()
catalog := NewCatalog(metakv, rootPath, "")
assert.Panics(t, func() {
catalog.AddSegment(context.TODO(), invalidSegment)
})
err := catalog.AddSegment(context.TODO(), invalidSegment)
assert.Error(t, err)
})
t.Run("save error", func(t *testing.T) {
@ -327,11 +327,10 @@ func Test_AlterSegments(t *testing.T) {
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("error")).Maybe()
catalog := NewCatalog(metakv, rootPath, "")
assert.Panics(t, func() {
catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{
Segment: invalidSegment,
})
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{
Segment: invalidSegment,
})
assert.Error(t, err)
})
t.Run("save error", func(t *testing.T) {
@ -1059,6 +1058,54 @@ func TestCatalog_DropSegmentIndex(t *testing.T) {
})
}
func TestCatalog_Compress(t *testing.T) {
segmentInfo := getSegment(rootPath, 0, 1, 2, 3, 10000)
val, err := proto.Marshal(segmentInfo)
assert.NoError(t, err)
compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
compressedSegmentInfo.Binlogs, err = CompressBinLog(compressedSegmentInfo.Binlogs)
assert.NoError(t, err)
compressedSegmentInfo.Deltalogs, err = CompressBinLog(compressedSegmentInfo.Deltalogs)
assert.NoError(t, err)
compressedSegmentInfo.Statslogs, err = CompressBinLog(compressedSegmentInfo.Statslogs)
assert.NoError(t, err)
valCompressed, err := proto.Marshal(compressedSegmentInfo)
assert.NoError(t, err)
assert.True(t, len(valCompressed) < len(val))
// make sure the compact
unmarshaledSegmentInfo := &datapb.SegmentInfo{}
proto.Unmarshal(val, unmarshaledSegmentInfo)
unmarshaledSegmentInfoCompressed := &datapb.SegmentInfo{}
proto.Unmarshal(valCompressed, unmarshaledSegmentInfoCompressed)
DecompressBinLog(rootPath, unmarshaledSegmentInfoCompressed)
assert.Equal(t, len(unmarshaledSegmentInfo.GetBinlogs()), len(unmarshaledSegmentInfoCompressed.GetBinlogs()))
for i := 0; i < 1000; i++ {
assert.Equal(t, unmarshaledSegmentInfo.GetBinlogs()[0].Binlogs[i].LogPath, unmarshaledSegmentInfoCompressed.GetBinlogs()[0].Binlogs[i].LogPath)
}
// test compress erorr path
fakeBinlogs := make([]*datapb.Binlog, 1)
fakeBinlogs[0] = &datapb.Binlog{
EntriesNum: 10000,
LogPath: "test",
}
fieldBinLogs := make([]*datapb.FieldBinlog, 1)
fieldBinLogs[0] = &datapb.FieldBinlog{
FieldID: 106,
Binlogs: fakeBinlogs,
}
compressedSegmentInfo.Binlogs, err = CompressBinLog(fieldBinLogs)
assert.Error(t, err)
// test decompress error path
}
func BenchmarkCatalog_List1000Segments(b *testing.B) {
paramtable.Init()
etcdCli, err := etcd.GetEtcdClient(
@ -1140,6 +1187,58 @@ func addSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i
},
},
}
statslogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogPath: metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())),
},
},
},
}
return &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltalogs,
Statslogs: statslogs,
}
}
func getSegment(rootPath string, collectionID, partitionID, segmentID, fieldID int64, binlogNum int) *datapb.SegmentInfo {
binLogPaths := make([]*datapb.Binlog, binlogNum)
for i := 0; i < binlogNum; i++ {
binLogPaths[i] = &datapb.Binlog{
EntriesNum: 10000,
LogPath: metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(i)),
}
}
binlogs = []*datapb.FieldBinlog{
{
FieldID: fieldID,
Binlogs: binLogPaths,
},
}
deltalogs = []*datapb.FieldBinlog{
{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogPath: metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, int64(rand.Int())),
},
},
},
}
statslogs = []*datapb.FieldBinlog{
{
FieldID: 1,

View File

@ -26,9 +26,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -147,6 +149,11 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
return nil, nil, err
}
path := params.Params.MinioCfg.RootPath.GetValue()
// refill log ID with log path
for _, segmentInfo := range recoveryInfo.Segments {
datacoord.DecompressBinLog(path, segmentInfo)
}
return recoveryInfo.Channels, recoveryInfo.Segments, nil
}

View File

@ -46,7 +46,7 @@ const (
DefaultMaxAttempts = 10
DefaultInitialBackoff float64 = 0.2
DefaultMaxBackoff float64 = 10
DefaultCompressionEnabled bool = false
DefaultCompressionEnabled bool = true
ProxyInternalPort = 19529
ProxyExternalPort = 19530