mirror of https://github.com/milvus-io/milvus.git
fix: [2.3]parse logID from logPath if copyDeltalog find logID not provided (#29276)
Cherry-pick from master pr: #29273 See also: #29272 This PR add `getDeltaLogID` to safely return logID when Binlog struct has zero value logID. It parses logID from logPath if the format is valid. Otherwise, this function shall return error. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/29386/head
parent
26b1853c54
commit
5ba0f476d5
|
@ -21,6 +21,8 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -41,6 +43,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
|
@ -1081,7 +1084,12 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
for _, fieldBinlog := range binlogs {
|
||||
fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog)
|
||||
for _, binlog := range fieldBinlog.Binlogs {
|
||||
blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID)
|
||||
logID, err := getDeltaLogID(m.chunkManager.RootPath(), binlog)
|
||||
if err != nil {
|
||||
log.Error("failed to get logID from binlog", zap.Int64("segmentID", targetSegmentID), zap.Stringer("binlog", binlog), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, logID)
|
||||
blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
|
||||
blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath)
|
||||
if err != nil {
|
||||
|
@ -1098,6 +1106,34 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
// getDeltaLogID is the util function to return delta logID from datapb.Binlog
|
||||
// if LogID field is filled, return field value directly.
|
||||
// otherwise, try to parse logID from LogPath.
|
||||
func getDeltaLogID(rootPath string, binlog *datapb.Binlog) (int64, error) {
|
||||
if binlog.GetLogID() != 0 {
|
||||
return binlog.GetLogID(), nil
|
||||
}
|
||||
|
||||
path := binlog.GetLogPath()
|
||||
// check path contains rootPath as prefix
|
||||
if !strings.HasPrefix(path, rootPath) {
|
||||
return 0, fmt.Errorf("path \"%s\" does not contains rootPath \"%s\"", path, rootPath)
|
||||
}
|
||||
p := path[len(rootPath):]
|
||||
// remove leading "/"
|
||||
for strings.HasPrefix(p, "/") {
|
||||
p = p[1:]
|
||||
}
|
||||
|
||||
// delta binlog path should consist of "delta_log/collID/partID/segID/logID"
|
||||
parts := strings.Split(p, "/")
|
||||
if len(parts) != 5 {
|
||||
return 0, merr.WrapErrParameterInvalid("valid delta log path", path)
|
||||
}
|
||||
|
||||
return strconv.ParseInt(parts[4], 10, 64)
|
||||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom))
|
||||
for _, segment := range segmentsCompactFrom {
|
||||
|
|
|
@ -1022,3 +1022,161 @@ func Test_meta_GcConfirm(t *testing.T) {
|
|||
|
||||
assert.False(t, m.GcConfirm(context.TODO(), 100, 10000))
|
||||
}
|
||||
|
||||
func TestGetDeltaLogID(t *testing.T) {
|
||||
type testCase struct {
|
||||
tag string
|
||||
rootPath string
|
||||
binlog *datapb.Binlog
|
||||
expectErr bool
|
||||
expectID int64
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
tag: "has_log_id",
|
||||
rootPath: "files",
|
||||
binlog: &datapb.Binlog{
|
||||
LogID: 446329278451403166,
|
||||
},
|
||||
expectErr: false,
|
||||
expectID: 446329278451403166,
|
||||
},
|
||||
{
|
||||
tag: "parse_normal_logPath",
|
||||
rootPath: "files",
|
||||
binlog: &datapb.Binlog{
|
||||
LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166",
|
||||
},
|
||||
expectErr: false,
|
||||
expectID: 446329278451403166,
|
||||
},
|
||||
{
|
||||
tag: "invalid_rootpath_prefix",
|
||||
rootPath: "files",
|
||||
binlog: &datapb.Binlog{
|
||||
LogPath: "file/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166",
|
||||
},
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
tag: "invalid_deltalog_path",
|
||||
rootPath: "files",
|
||||
binlog: &datapb.Binlog{
|
||||
LogPath: "files/delta_log/446329278451403166",
|
||||
},
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
tag: "invalid_logID",
|
||||
rootPath: "files",
|
||||
binlog: &datapb.Binlog{
|
||||
LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/meta_files",
|
||||
},
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
logID, err := getDeltaLogID(tc.rootPath, tc.binlog)
|
||||
if tc.expectErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.expectID, logID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_meta_copyDeltaFiles(t *testing.T) {
|
||||
cm := mocks.NewChunkManager(t)
|
||||
cm.EXPECT().RootPath().Return("files").Maybe()
|
||||
m := &meta{
|
||||
chunkManager: cm,
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
tag string
|
||||
binlogs []*datapb.FieldBinlog
|
||||
collectionID int64
|
||||
partitionID int64
|
||||
segmentID int64
|
||||
|
||||
expectResult []*datapb.FieldBinlog
|
||||
expectErr bool
|
||||
}
|
||||
|
||||
cases := []*testCase{
|
||||
{
|
||||
tag: "normal_logID",
|
||||
binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{LogID: 446329278451403166},
|
||||
},
|
||||
},
|
||||
},
|
||||
collectionID: 446329278451203130,
|
||||
partitionID: 446329278451203131,
|
||||
segmentID: 446329278451403143,
|
||||
expectResult: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{LogID: 446329278451403166, LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403143/446329278451403166"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
tag: "normal_logPath",
|
||||
binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166"},
|
||||
},
|
||||
},
|
||||
},
|
||||
collectionID: 446329278451203130,
|
||||
partitionID: 446329278451203131,
|
||||
segmentID: 446329278451403143,
|
||||
expectResult: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{LogPath: "files/delta_log/446329278451203130/446329278451203131/446329278451403143/446329278451403166"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
tag: "bad_logPath",
|
||||
binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{LogPath: "other_prefix/delta_log/446329278451203130/446329278451203131/446329278451403142/446329278451403166"},
|
||||
},
|
||||
},
|
||||
},
|
||||
collectionID: 446329278451203130,
|
||||
partitionID: 446329278451203131,
|
||||
segmentID: 446329278451403143,
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
cm.EXPECT().Read(mock.Anything, mock.Anything).Return([]byte("test"), nil).Maybe()
|
||||
cm.EXPECT().Write(mock.Anything, mock.Anything, []byte("test")).Return(nil).Maybe()
|
||||
|
||||
result, err := m.copyDeltaFiles(tc.binlogs, tc.collectionID, tc.partitionID, tc.segmentID)
|
||||
if tc.expectErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.expectResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue