From 5ba0f476d59ff298eb1642c4e4800f9c7ff719e4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 27 Dec 2023 14:42:46 +0800 Subject: [PATCH] fix: [2.3]parse logID from logPath if copyDeltalog find logID not provided (#29276) Cherry-pick from master pr: #29273 See also: #29272 This PR add `getDeltaLogID` to safely return logID when Binlog struct has zero value logID. It parses logID from logPath if the format is valid. Otherwise, this function shall return error. --------- Signed-off-by: Congqi Xia --- internal/datacoord/meta.go | 38 +++++++- internal/datacoord/meta_test.go | 158 ++++++++++++++++++++++++++++++++ 2 files changed, 195 insertions(+), 1 deletion(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 2d33d03611..ac083f856a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -21,6 +21,8 @@ import ( "context" "fmt" "path" + "strconv" + "strings" "sync" "time" @@ -41,6 +43,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -1081,7 +1084,12 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti for _, fieldBinlog := range binlogs { fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog) for _, binlog := range fieldBinlog.Binlogs { - blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID) + logID, err := getDeltaLogID(m.chunkManager.RootPath(), binlog) + if err != nil { + log.Error("failed to get logID from binlog", zap.Int64("segmentID", targetSegmentID), zap.Stringer("binlog", binlog), zap.Error(err)) + return nil, err + } + blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, logID) blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath) if err != nil { @@ -1098,6 +1106,34 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti return ret, nil } +// getDeltaLogID is the util function to return delta logID from datapb.Binlog +// if LogID field is filled, return field value directly. +// otherwise, try to parse logID from LogPath. +func getDeltaLogID(rootPath string, binlog *datapb.Binlog) (int64, error) { + if binlog.GetLogID() != 0 { + return binlog.GetLogID(), nil + } + + path := binlog.GetLogPath() + // check path contains rootPath as prefix + if !strings.HasPrefix(path, rootPath) { + return 0, fmt.Errorf("path \"%s\" does not contains rootPath \"%s\"", path, rootPath) + } + p := path[len(rootPath):] + // remove leading "/" + for strings.HasPrefix(p, "/") { + p = p[1:] + } + + // delta binlog path should consist of "delta_log/collID/partID/segID/logID" + parts := strings.Split(p, "/") + if len(parts) != 5 { + return 0, merr.WrapErrParameterInvalid("valid delta log path", path) + } + + return strconv.ParseInt(parts[4], 10, 64) +} + func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error { modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom)) for _, segment := range segmentsCompactFrom { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 87b122c3df..1c4d9ad7fe 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -1022,3 +1022,161 @@ func Test_meta_GcConfirm(t *testing.T) { assert.False(t, m.GcConfirm(context.TODO(), 100, 10000)) } + +func TestGetDeltaLogID(t *testing.T) { + type testCase struct { + tag string + rootPath string + binlog *datapb.Binlog + expectErr bool + expectID int64 + } + + cases := []testCase{ + { + tag: "has_log_id", + rootPath: "files", + binlog: &datapb.Binlog{ + LogID: 446329278451403166, + }, + expectErr: false, + expectID: 446329278451403166, + }, + { + tag: "parse_normal_logPath", + rootPath: "files", + binlog: &datapb.Binlog{ + LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166", + }, + expectErr: false, + expectID: 446329278451403166, + }, + { + tag: "invalid_rootpath_prefix", + rootPath: "files", + binlog: &datapb.Binlog{ + LogPath: "file/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166", + }, + expectErr: true, + }, + { + tag: "invalid_deltalog_path", + rootPath: "files", + binlog: &datapb.Binlog{ + LogPath: "files/delta_log/446329278451403166", + }, + expectErr: true, + }, + { + tag: "invalid_logID", + rootPath: "files", + binlog: &datapb.Binlog{ + LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/meta_files", + }, + expectErr: true, + }, + } + + for _, tc := range cases { + t.Run(tc.tag, func(t *testing.T) { + logID, err := getDeltaLogID(tc.rootPath, tc.binlog) + if tc.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectID, logID) + } + }) + } +} + +func Test_meta_copyDeltaFiles(t *testing.T) { + cm := mocks.NewChunkManager(t) + cm.EXPECT().RootPath().Return("files").Maybe() + m := &meta{ + chunkManager: cm, + } + + type testCase struct { + tag string + binlogs []*datapb.FieldBinlog + collectionID int64 + partitionID int64 + segmentID int64 + + expectResult []*datapb.FieldBinlog + expectErr bool + } + + cases := []*testCase{ + { + tag: "normal_logID", + binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogID: 446329278451403166}, + }, + }, + }, + collectionID: 446329278451203130, + partitionID: 446329278451203131, + segmentID: 446329278451403143, + expectResult: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogID: 446329278451403166, LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403143/446329278451403166"}, + }, + }, + }, + }, + { + tag: "normal_logPath", + binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166"}, + }, + }, + }, + collectionID: 446329278451203130, + partitionID: 446329278451203131, + segmentID: 446329278451403143, + expectResult: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403143/446329278451403166"}, + }, + }, + }, + }, + { + tag: "bad_logPath", + binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "other_prefix/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166"}, + }, + }, + }, + collectionID: 446329278451203130, + partitionID: 446329278451203131, + segmentID: 446329278451403143, + expectErr: true, + }, + } + + for _, tc := range cases { + t.Run(tc.tag, func(t *testing.T) { + cm.EXPECT().Read(mock.Anything, mock.Anything).Return([]byte("test"), nil).Maybe() + cm.EXPECT().Write(mock.Anything, mock.Anything, []byte("test")).Return(nil).Maybe() + + result, err := m.copyDeltaFiles(tc.binlogs, tc.collectionID, tc.partitionID, tc.segmentID) + if tc.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectResult, result) + } + }) + } +}