mirror of https://github.com/milvus-io/milvus.git
Make sure that the segment path matches the segment id (#20677)
Signed-off-by: SimFG <bang.fu@zilliz.com> Signed-off-by: SimFG <bang.fu@zilliz.com>pull/20713/head
parent
633a749880
commit
dffcd974ff
|
@ -22,6 +22,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metautil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -219,6 +222,18 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
|
|||
assert.Less(t, uint64(2), max-min)
|
||||
}
|
||||
|
||||
func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
||||
return metautil.BuildInsertLogPath(rootPath, 10, 100, segmentID, 1000, 10000)
|
||||
}
|
||||
|
||||
func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
||||
return metautil.BuildStatsLogPath(rootPath, 10, 100, segmentID, 1000, 10000)
|
||||
}
|
||||
|
||||
func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
||||
return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000)
|
||||
}
|
||||
|
||||
func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
||||
mockDataNode := &mocks.DataNode{}
|
||||
mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
|
||||
|
@ -227,16 +242,16 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
|||
|
||||
seg1 := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))},
|
||||
}
|
||||
|
||||
seg2 := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))},
|
||||
}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
|
@ -319,18 +334,18 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
|||
PlanID: 1,
|
||||
SegmentID: 3,
|
||||
NumOfRows: 15,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")},
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))},
|
||||
}
|
||||
|
||||
compactionResult2 := &datapb.CompactionResult{
|
||||
PlanID: 1,
|
||||
SegmentID: 3,
|
||||
NumOfRows: 0,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")},
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))},
|
||||
}
|
||||
|
||||
has, err := c.meta.HasSegments([]UniqueID{1, 2})
|
||||
|
@ -385,16 +400,16 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
|||
|
||||
seg1 := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))},
|
||||
}
|
||||
|
||||
seg2 := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))},
|
||||
}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
|
@ -448,9 +463,9 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
|||
PlanID: 1,
|
||||
SegmentID: 3,
|
||||
NumOfRows: 15,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")},
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))},
|
||||
}
|
||||
|
||||
flushCh := make(chan UniqueID, 1)
|
||||
|
@ -480,16 +495,16 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
|||
|
||||
seg1 := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))},
|
||||
}
|
||||
|
||||
seg2 := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")},
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))},
|
||||
}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
|
@ -549,9 +564,9 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
|||
PlanID: 1,
|
||||
SegmentID: 3,
|
||||
NumOfRows: 0,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")},
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))},
|
||||
}
|
||||
|
||||
flushCh := make(chan UniqueID, 1)
|
||||
|
|
|
@ -308,7 +308,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i
|
|||
if i == 1 {
|
||||
token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8), funcutil.RandomString(8))
|
||||
} else {
|
||||
token = path.Join(strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8))
|
||||
token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8), funcutil.RandomString(8))
|
||||
}
|
||||
// insert
|
||||
filePath := path.Join(root, insertLogPrefix, token)
|
||||
|
@ -329,7 +329,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i
|
|||
if i == 1 {
|
||||
token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8))
|
||||
} else {
|
||||
token = path.Join(strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8))
|
||||
token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8))
|
||||
}
|
||||
filePath = path.Join(root, deltaLogPrefix, token)
|
||||
info, err = cli.PutObject(context.TODO(), bucket, filePath, reader, int64(len(content)), minio.PutObjectOptions{})
|
||||
|
|
|
@ -526,14 +526,14 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
|
|||
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0")}}}
|
||||
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}}}
|
||||
err = meta.AddSegment(segment1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog1")},
|
||||
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog1")},
|
||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}},
|
||||
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog1", 1))},
|
||||
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))},
|
||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}},
|
||||
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -598,9 +598,9 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
|
|||
}
|
||||
meta.segments.SetSegment(1, segmentInfo)
|
||||
|
||||
err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog")},
|
||||
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog")},
|
||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}},
|
||||
err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog", 1))},
|
||||
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("statslog", 1))},
|
||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}},
|
||||
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "mocked fail", err.Error())
|
||||
|
|
|
@ -1150,10 +1150,10 @@ func TestSaveBinlogPaths(t *testing.T) {
|
|||
FieldID: 1,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogPath: "/by-dev/test/0/1/2/1/Allo1",
|
||||
LogPath: "/by-dev/test/0/1/1/1/Allo1",
|
||||
},
|
||||
{
|
||||
LogPath: "/by-dev/test/0/1/2/1/Allo2",
|
||||
LogPath: "/by-dev/test/0/1/1/1/Allo2",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1183,8 +1183,8 @@ func TestSaveBinlogPaths(t *testing.T) {
|
|||
assert.NotNil(t, fieldBinlogs)
|
||||
assert.EqualValues(t, 2, len(fieldBinlogs.GetBinlogs()))
|
||||
assert.EqualValues(t, 1, fieldBinlogs.GetFieldID())
|
||||
assert.EqualValues(t, "/by-dev/test/0/1/2/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath())
|
||||
assert.EqualValues(t, "/by-dev/test/0/1/2/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath())
|
||||
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath())
|
||||
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath())
|
||||
|
||||
segmentInfo := svr.meta.GetSegment(0)
|
||||
assert.NotNil(t, segmentInfo)
|
||||
|
|
|
@ -260,7 +260,6 @@ func testIndexCoord(t *testing.T) {
|
|||
indexs := ic.metaTable.collectionIndexes
|
||||
ic.metaTable.collectionIndexes = make(map[UniqueID]map[UniqueID]*model.Index)
|
||||
defer func() {
|
||||
fmt.Println("simfg fubang")
|
||||
ic.metaTable.collectionIndexes = indexs
|
||||
}()
|
||||
|
||||
|
@ -278,7 +277,19 @@ func testIndexCoord(t *testing.T) {
|
|||
break
|
||||
}
|
||||
|
||||
indexs := ic.metaTable.segmentIndexes
|
||||
updateSegmentIndexes := func(i map[UniqueID]map[UniqueID]*model.SegmentIndex) {
|
||||
ic.metaTable.indexLock.Lock()
|
||||
ic.metaTable.segmentIndexes = i
|
||||
ic.metaTable.indexLock.Unlock()
|
||||
}
|
||||
|
||||
getSegmentIndexes := func() map[UniqueID]map[UniqueID]*model.SegmentIndex {
|
||||
ic.metaTable.indexLock.RLock()
|
||||
defer ic.metaTable.indexLock.RUnlock()
|
||||
return ic.metaTable.segmentIndexes
|
||||
}
|
||||
|
||||
indexs := getSegmentIndexes()
|
||||
mockIndexs := make(map[UniqueID]map[UniqueID]*model.SegmentIndex)
|
||||
progressIndex := &model.SegmentIndex{
|
||||
IndexState: commonpb.IndexState_InProgress,
|
||||
|
@ -292,9 +303,10 @@ func testIndexCoord(t *testing.T) {
|
|||
IndexState: commonpb.IndexState_Finished,
|
||||
NumRows: 2048,
|
||||
}
|
||||
ic.metaTable.segmentIndexes = mockIndexs
|
||||
|
||||
updateSegmentIndexes(mockIndexs)
|
||||
defer func() {
|
||||
ic.metaTable.segmentIndexes = indexs
|
||||
updateSegmentIndexes(indexs)
|
||||
}()
|
||||
|
||||
mockIndexs[111] = make(map[UniqueID]*model.SegmentIndex)
|
||||
|
|
|
@ -465,9 +465,35 @@ func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, co
|
|||
}
|
||||
}
|
||||
|
||||
func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) {
|
||||
check := func(getSegmentID func(logPath string) typeutil.UniqueID) {
|
||||
for _, fieldBinlog := range logs {
|
||||
for _, binlog := range fieldBinlog.Binlogs {
|
||||
if segmentID != getSegmentID(binlog.LogPath) {
|
||||
log.Panic("the segment path doesn't match the segment id", zap.Int64("segment_id", segmentID), zap.String("path", binlog.LogPath))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
switch binlogType {
|
||||
case storage.InsertBinlog:
|
||||
check(metautil.GetSegmentIDFromInsertLogPath)
|
||||
case storage.DeleteBinlog:
|
||||
check(metautil.GetSegmentIDFromDeltaLogPath)
|
||||
case storage.StatsBinlog:
|
||||
check(metautil.GetSegmentIDFromStatsLogPath)
|
||||
default:
|
||||
log.Panic("invalid binlog type")
|
||||
}
|
||||
}
|
||||
|
||||
func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID,
|
||||
binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) {
|
||||
|
||||
checkBinlogs(storage.InsertBinlog, segmentID, binlogs)
|
||||
checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs)
|
||||
checkBinlogs(storage.StatsBinlog, segmentID, statslogs)
|
||||
|
||||
fillLogIDByLogPath(binlogs, deltalogs, statslogs)
|
||||
kvs, err := buildBinlogKvs(collectionID, partitionID, segmentID, binlogs, deltalogs, statslogs)
|
||||
if err != nil {
|
||||
|
|
|
@ -39,6 +39,10 @@ var (
|
|||
deltalogPath = metautil.BuildDeltaLogPath("a", collectionID, partitionID, segmentID, logID)
|
||||
statslogPath = metautil.BuildStatsLogPath("a", collectionID, partitionID, segmentID, fieldID, logID)
|
||||
|
||||
binlogPath2 = metautil.BuildInsertLogPath("a", collectionID, partitionID, segmentID2, fieldID, logID)
|
||||
deltalogPath2 = metautil.BuildDeltaLogPath("a", collectionID, partitionID, segmentID2, logID)
|
||||
statslogPath2 = metautil.BuildStatsLogPath("a", collectionID, partitionID, segmentID2, fieldID, logID)
|
||||
|
||||
k1 = buildFieldBinlogPath(collectionID, partitionID, segmentID, fieldID)
|
||||
k2 = buildFieldDeltalogPath(collectionID, partitionID, segmentID, fieldID)
|
||||
k3 = buildFieldStatslogPath(collectionID, partitionID, segmentID, fieldID)
|
||||
|
@ -114,6 +118,20 @@ var (
|
|||
},
|
||||
}
|
||||
|
||||
getlogs = func(logpath string) []*datapb.FieldBinlog {
|
||||
return []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 1,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
EntriesNum: 5,
|
||||
LogPath: logpath,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
segment1 = &datapb.SegmentInfo{
|
||||
ID: segmentID,
|
||||
CollectionID: collectionID,
|
||||
|
@ -131,9 +149,9 @@ var (
|
|||
PartitionID: partitionID,
|
||||
NumOfRows: 100,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Binlogs: binlogs,
|
||||
Deltalogs: deltalogs,
|
||||
Statslogs: statslogs,
|
||||
Binlogs: getlogs(binlogPath2),
|
||||
Deltalogs: getlogs(deltalogPath2),
|
||||
Statslogs: getlogs(statslogPath2),
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -257,8 +275,9 @@ func Test_AddSegments(t *testing.T) {
|
|||
}
|
||||
|
||||
catalog := &Catalog{txn, "a"}
|
||||
err := catalog.AddSegment(context.TODO(), invalidSegment)
|
||||
assert.Error(t, err)
|
||||
assert.Panics(t, func() {
|
||||
catalog.AddSegment(context.TODO(), invalidSegment)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("save error", func(t *testing.T) {
|
||||
|
@ -299,8 +318,9 @@ func Test_AlterSegments(t *testing.T) {
|
|||
}
|
||||
|
||||
catalog := &Catalog{txn, "a"}
|
||||
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment})
|
||||
assert.Error(t, err)
|
||||
assert.Panics(t, func() {
|
||||
catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("save error", func(t *testing.T) {
|
||||
|
@ -420,6 +440,7 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {
|
|||
return []string{}, []string{}, nil
|
||||
}
|
||||
|
||||
// TODO fubang
|
||||
catalog := &Catalog{txn, "a"}
|
||||
err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -3,27 +3,56 @@ package metautil
|
|||
import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
)
|
||||
|
||||
const pathSep = "/"
|
||||
|
||||
func BuildInsertLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string {
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID)
|
||||
return path.Join(rootPath, common.SegmentInsertLogPath, k)
|
||||
}
|
||||
|
||||
func GetSegmentIDFromInsertLogPath(logPath string) typeutil.UniqueID {
|
||||
return getSegmentIDFromPath(logPath, 3)
|
||||
}
|
||||
|
||||
func BuildStatsLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string {
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID)
|
||||
return path.Join(rootPath, common.SegmentStatslogPath, k)
|
||||
}
|
||||
|
||||
func GetSegmentIDFromStatsLogPath(logPath string) typeutil.UniqueID {
|
||||
return getSegmentIDFromPath(logPath, 3)
|
||||
}
|
||||
|
||||
func BuildDeltaLogPath(rootPath string, collectionID, partitionID, segmentID, logID typeutil.UniqueID) string {
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, logID)
|
||||
return path.Join(rootPath, common.SegmentDeltaLogPath, k)
|
||||
}
|
||||
|
||||
func GetSegmentIDFromDeltaLogPath(logPath string) typeutil.UniqueID {
|
||||
return getSegmentIDFromPath(logPath, 2)
|
||||
}
|
||||
|
||||
func getSegmentIDFromPath(logPath string, segmentIndex int) typeutil.UniqueID {
|
||||
infos := strings.Split(logPath, pathSep)
|
||||
l := len(infos)
|
||||
if l < segmentIndex {
|
||||
return 0
|
||||
}
|
||||
|
||||
v, err := strconv.ParseInt(infos[l-segmentIndex], 10, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// JoinIDPath joins ids to path format.
|
||||
func JoinIDPath(ids ...typeutil.UniqueID) string {
|
||||
idStr := make([]string, 0, len(ids))
|
||||
|
|
Loading…
Reference in New Issue