mirror of https://github.com/milvus-io/milvus.git
Fix Parse binlog path failure (#18583)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/18325/head
parent
4d46aa5331
commit
efa5dfaa7b
|
@ -21,6 +21,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
@ -138,8 +140,8 @@ func (gc *garbageCollector) scan() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
segmentID, err := storage.ParseSegmentIDByBinlog(infoKey)
|
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.rootPath, infoKey)
|
||||||
if err != nil {
|
if err != nil && !common.IsIgnorableError(err) {
|
||||||
log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err))
|
log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,15 +4,32 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ParseSegmentIDByBinlog parse segment id from binlog paths
|
// ParseSegmentIDByBinlog parse segment id from binlog paths
|
||||||
// if path format is not expected, returns error
|
// if path format is not expected, returns error
|
||||||
func ParseSegmentIDByBinlog(path string) (UniqueID, error) {
|
func ParseSegmentIDByBinlog(rootPath, path string) (UniqueID, error) {
|
||||||
// binlog path should consist of "[prefix]/insertLog/collID/partID/segID/fieldID/fileName"
|
// check path contains rootPath as prefix
|
||||||
keyStr := strings.Split(path, "/")
|
if !strings.HasPrefix(path, rootPath) {
|
||||||
if len(keyStr) != 7 {
|
return 0, fmt.Errorf("path \"%s\" does not contains rootPath \"%s\"", path, rootPath)
|
||||||
return 0, fmt.Errorf("%s is not a valid binlog path", path)
|
|
||||||
}
|
}
|
||||||
return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64)
|
p := path[len(rootPath):]
|
||||||
|
|
||||||
|
// remove leading "/"
|
||||||
|
for strings.HasPrefix(p, "/") {
|
||||||
|
p = p[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// binlog path should consist of "[log_type]/collID/partID/segID/fieldID/fileName"
|
||||||
|
keyStr := strings.Split(p, "/")
|
||||||
|
if len(keyStr) == 5 {
|
||||||
|
return 0, common.NewIgnorableError(fmt.Errorf("%s does not contains a file name", path))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(keyStr) == 6 {
|
||||||
|
return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64)
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("%s is not a valid binlog path", path)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,53 +3,71 @@ package storage
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseSegmentIDByBinlog(t *testing.T) {
|
func TestParseSegmentIDByBinlog(t *testing.T) {
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
input string
|
input string
|
||||||
expectError bool
|
rootPath string
|
||||||
expectID UniqueID
|
expectError bool
|
||||||
|
expectID UniqueID
|
||||||
|
isIgnorableError bool
|
||||||
}
|
}
|
||||||
|
|
||||||
cases := []testCase{
|
cases := []testCase{
|
||||||
{
|
{
|
||||||
name: "normal case",
|
name: "normal case",
|
||||||
input: "files/insertLog/123/456/1/101/10000001",
|
input: "files/insertLog/123/456/1/101/10000001",
|
||||||
|
rootPath: "files",
|
||||||
expectError: false,
|
expectError: false,
|
||||||
expectID: 1,
|
expectID: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "normal case long id",
|
name: "normal case long id",
|
||||||
input: "files/insertLog/123/456/434828745294479362/101/10000001",
|
input: "files/insertLog/123/456/434828745294479362/101/10000001",
|
||||||
|
rootPath: "files",
|
||||||
expectError: false,
|
expectError: false,
|
||||||
expectID: 434828745294479362,
|
expectID: 434828745294479362,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "bad format",
|
name: "bad format",
|
||||||
input: "files/123",
|
input: "files/123",
|
||||||
|
rootPath: "files",
|
||||||
expectError: true,
|
expectError: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "empty input",
|
name: "empty input",
|
||||||
input: "",
|
input: "",
|
||||||
|
rootPath: "files",
|
||||||
expectError: true,
|
expectError: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "non-number segmentid",
|
name: "non-number segmentid",
|
||||||
input: "files/insertLog/123/456/segment_id/101/10000001",
|
input: "files/insertLog/123/456/segment_id/101/10000001",
|
||||||
|
rootPath: "files",
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "file name doesn't exists",
|
||||||
|
input: "tenant1/files/delta_log/609/610/457/793",
|
||||||
|
rootPath: "tenant1/files",
|
||||||
expectError: true,
|
expectError: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
id, err := ParseSegmentIDByBinlog(tc.input)
|
id, err := ParseSegmentIDByBinlog(tc.rootPath, tc.input)
|
||||||
if tc.expectError {
|
if tc.expectError {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
if tc.isIgnorableError {
|
||||||
|
assert.True(t, common.IsIgnorableError(err))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, tc.expectID, id)
|
assert.Equal(t, tc.expectID, id)
|
||||||
|
|
Loading…
Reference in New Issue