mirror of https://github.com/milvus-io/milvus.git
Unify rootPath in configs and ChunkManager (#18996)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/19028/head
parent
853793ad40
commit
cf36cebdc8
|
@ -60,3 +60,17 @@ const (
|
|||
// Endian is type alias of binary.LittleEndian.
|
||||
// Milvus uses little endian by default.
|
||||
var Endian = binary.LittleEndian
|
||||
|
||||
const (
|
||||
// SegmentInsertLogPath storage path const for segment insert binlog.
|
||||
SegmentInsertLogPath = `insert_log`
|
||||
|
||||
// SegmentDeltaLogPath storage path const for segment delta log.
|
||||
SegmentDeltaLogPath = `delta_log`
|
||||
|
||||
// SegmentStatslogPath storage path const for segment stats log.
|
||||
SegmentStatslogPath = `stats_log`
|
||||
|
||||
// SegmentIndexPath storage path const for segment index files.
|
||||
SegmentIndexPath = `index_files`
|
||||
)
|
||||
|
|
|
@ -231,7 +231,7 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
|
|||
return "", nil, err
|
||||
}
|
||||
|
||||
key := path.Join(Params.DataNodeCfg.DeleteBinlogRootPath, k)
|
||||
key := path.Join(b.ChunkManager.RootPath(), common.SegmentDeltaLogPath, k)
|
||||
|
||||
return key, blob.GetValue(), nil
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
|
|||
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
|
||||
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
|
||||
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
|
||||
key := path.Join(Params.DataNodeCfg.InsertBinlogRootPath, k)
|
||||
key := path.Join(b.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)
|
||||
|
||||
value := blob.GetValue()
|
||||
fileLen := len(value)
|
||||
|
@ -279,7 +279,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
|
|||
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
|
||||
|
||||
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
|
||||
key := path.Join(Params.DataNodeCfg.StatsBinlogRootPath, k)
|
||||
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
|
||||
|
||||
value := blob.GetValue()
|
||||
fileLen := len(value)
|
||||
|
|
|
@ -369,6 +369,10 @@ type mockCm struct {
|
|||
|
||||
var _ storage.ChunkManager = (*mockCm)(nil)
|
||||
|
||||
func (mk *mockCm) RootPath() string {
|
||||
return "mock_test"
|
||||
}
|
||||
|
||||
func (mk *mockCm) Write(filePath string, content []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1105,7 +1105,7 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
|
|||
// no error raise if alloc=false
|
||||
k := JoinIDPath(req.GetImportTask().GetCollectionId(), req.GetImportTask().GetPartitionId(), segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.DataNodeCfg.InsertBinlogRootPath, k)
|
||||
key := path.Join(node.chunkManager.RootPath(), common.SegmentInsertLogPath, k)
|
||||
kvs[key] = blob.Value[:]
|
||||
field2Insert[fieldID] = &datapb.Binlog{
|
||||
EntriesNum: data.size,
|
||||
|
@ -1131,7 +1131,7 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
|
|||
// no error raise if alloc=false
|
||||
k := JoinIDPath(req.GetImportTask().GetCollectionId(), req.GetImportTask().GetPartitionId(), segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.DataNodeCfg.StatsBinlogRootPath, k)
|
||||
key := path.Join(node.chunkManager.RootPath(), common.SegmentStatslogPath, k)
|
||||
kvs[key] = blob.Value
|
||||
field2Stats[fieldID] = &datapb.Binlog{
|
||||
EntriesNum: 0,
|
||||
|
|
|
@ -313,7 +313,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
c := &nodeConfig{
|
||||
replica: replica,
|
||||
|
@ -337,7 +336,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
c := &nodeConfig{
|
||||
replica: replica,
|
||||
|
@ -367,7 +365,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
c := &nodeConfig{
|
||||
replica: replica,
|
||||
|
@ -403,7 +400,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
c := &nodeConfig{
|
||||
replica: &mockReplica{},
|
||||
|
@ -436,7 +432,6 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
replica := &SegmentReplica{
|
||||
newSegments: make(map[UniqueID]*Segment),
|
||||
|
@ -492,7 +487,6 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
c := &nodeConfig{
|
||||
replica: &mockReplica{},
|
||||
|
@ -533,7 +527,6 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
|
|||
testPath := "/test/datanode/root/meta"
|
||||
assert.NoError(t, clearEtcd(testPath))
|
||||
Params.EtcdCfg.MetaRootPath = testPath
|
||||
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
|
||||
|
||||
replica := SegmentReplica{
|
||||
newSegments: make(map[UniqueID]*Segment),
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -384,7 +385,8 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
|
|||
// no error raise if alloc=false
|
||||
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.DataNodeCfg.InsertBinlogRootPath, k)
|
||||
// [rootPath]/[insert_log]/key
|
||||
key := path.Join(m.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)
|
||||
kvs[key] = blob.Value[:]
|
||||
field2Insert[fieldID] = &datapb.Binlog{
|
||||
EntriesNum: data.size,
|
||||
|
@ -410,7 +412,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
|
|||
// no error raise if alloc=false
|
||||
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.DataNodeCfg.StatsBinlogRootPath, k)
|
||||
key := path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
|
||||
kvs[key] = blob.Value
|
||||
field2Stats[fieldID] = &datapb.Binlog{
|
||||
EntriesNum: 0,
|
||||
|
@ -460,7 +462,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
|
|||
}
|
||||
|
||||
blobKey := JoinIDPath(collID, partID, segmentID, logID)
|
||||
blobPath := path.Join(Params.DataNodeCfg.DeleteBinlogRootPath, blobKey)
|
||||
blobPath := path.Join(m.ChunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
|
||||
kvs := map[string][]byte{blobPath: blob.Value[:]}
|
||||
data.LogSize = int64(len(blob.Value))
|
||||
data.LogPath = blobPath
|
||||
|
|
|
@ -18,9 +18,11 @@ package indexcoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"go.uber.org/zap"
|
||||
|
@ -107,7 +109,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
|||
case <-gc.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
prefix := Params.IndexNodeCfg.IndexStorageRootPath + "/"
|
||||
prefix := path.Join(gc.chunkManager.RootPath(), common.SegmentIndexPath) + "/"
|
||||
// list dir first
|
||||
keys, _, err := gc.chunkManager.ListWithPrefix(prefix, false)
|
||||
if err != nil {
|
||||
|
|
|
@ -80,6 +80,7 @@ func (ib *indexBuilder) refreshTasks(aliveNodes []UniqueID) {
|
|||
metas := ib.meta.GetAllIndexMeta()
|
||||
for build, indexMeta := range metas {
|
||||
// deleted, need to release lock and clean meta
|
||||
|
||||
if indexMeta.MarkDeleted {
|
||||
if indexMeta.NodeID != 0 {
|
||||
ib.tasks[build] = indexTaskDeleted
|
||||
|
@ -183,6 +184,7 @@ func (ib *indexBuilder) process(buildID UniqueID) {
|
|||
ib.tasks[buildID] = indexTaskRetry
|
||||
return
|
||||
}
|
||||
|
||||
req := &indexpb.CreateIndexRequest{
|
||||
IndexBuildID: buildID,
|
||||
IndexName: meta.indexMeta.Req.IndexName,
|
||||
|
|
|
@ -426,6 +426,10 @@ type chunkManagerMock struct {
|
|||
remove func(string) error
|
||||
}
|
||||
|
||||
func (cmm *chunkManagerMock) RootPath() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (cmm *chunkManagerMock) RemoveWithPrefix(prefix string) error {
|
||||
return cmm.removeWithPrefix(prefix)
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
@ -507,7 +508,7 @@ func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob)
|
|||
blobCnt := len(blobs)
|
||||
|
||||
getSavePathByKey := func(key string) string {
|
||||
return path.Join(Params.IndexNodeCfg.IndexStorageRootPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)),
|
||||
return path.Join(it.cm.RootPath(), common.SegmentIndexPath, strconv.Itoa(int(it.req.IndexBuildID)), strconv.Itoa(int(it.req.Version)),
|
||||
strconv.Itoa(int(it.partitionID)), strconv.Itoa(int(it.segmentID)), key)
|
||||
}
|
||||
|
||||
|
|
|
@ -89,6 +89,10 @@ func (mcm *mockChunkManager) Read(key string) ([]byte, error) {
|
|||
return mcm.read(key)
|
||||
}
|
||||
|
||||
func (mcm *mockChunkManager) RootPath() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func TestIndexBuildTask_Execute(t *testing.T) {
|
||||
t.Run("task retry", func(t *testing.T) {
|
||||
indexTask := &IndexBuildTask{
|
||||
|
@ -177,6 +181,7 @@ func TestIndexBuildTask_saveIndex(t *testing.T) {
|
|||
DataPaths: []string{"path1", "path2"},
|
||||
Version: 1,
|
||||
},
|
||||
cm: &mockChunkManager{},
|
||||
}
|
||||
|
||||
blobs := []*storage.Blob{
|
||||
|
|
|
@ -52,6 +52,11 @@ func NewLocalChunkManager(opts ...Option) *LocalChunkManager {
|
|||
}
|
||||
}
|
||||
|
||||
// RootPath returns lcm root path.
|
||||
func (lcm *LocalChunkManager) RootPath() string {
|
||||
return lcm.localPath
|
||||
}
|
||||
|
||||
// Path returns the path of local data if exists.
|
||||
func (lcm *LocalChunkManager) Path(filePath string) (string, error) {
|
||||
exist, err := lcm.Exist(filePath)
|
||||
|
|
|
@ -26,6 +26,11 @@ import (
|
|||
)
|
||||
|
||||
func TestLocalCM(t *testing.T) {
|
||||
t.Run("test RootPath", func(t *testing.T) {
|
||||
testCM := NewLocalChunkManager(RootPath(localPath))
|
||||
assert.Equal(t, localPath, testCM.RootPath())
|
||||
})
|
||||
|
||||
t.Run("test load", func(t *testing.T) {
|
||||
testLoadRoot := "test_load"
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -40,6 +41,7 @@ type MinioChunkManager struct {
|
|||
|
||||
ctx context.Context
|
||||
bucketName string
|
||||
rootPath string
|
||||
}
|
||||
|
||||
var _ ChunkManager = (*MinioChunkManager)(nil)
|
||||
|
@ -104,16 +106,28 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk
|
|||
Client: minIOClient,
|
||||
bucketName: c.bucketName,
|
||||
}
|
||||
log.Info("minio chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", c.rootPath))
|
||||
mcm.rootPath = mcm.normalizeRootPath(c.rootPath)
|
||||
log.Info("minio chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath()))
|
||||
return mcm, nil
|
||||
}
|
||||
|
||||
// normalizeRootPath
|
||||
func (mcm *MinioChunkManager) normalizeRootPath(rootPath string) string {
|
||||
// no leading "/"
|
||||
return strings.TrimLeft(rootPath, "/")
|
||||
}
|
||||
|
||||
// SetVar set the variable value of mcm
|
||||
func (mcm *MinioChunkManager) SetVar(ctx context.Context, bucketName string) {
|
||||
mcm.ctx = ctx
|
||||
mcm.bucketName = bucketName
|
||||
}
|
||||
|
||||
// RootPath returns minio root path.
|
||||
func (mcm *MinioChunkManager) RootPath() string {
|
||||
return mcm.rootPath
|
||||
}
|
||||
|
||||
// Path returns the path of minio data if exists.
|
||||
func (mcm *MinioChunkManager) Path(filePath string) (string, error) {
|
||||
exist, err := mcm.Exist(filePath)
|
||||
|
|
|
@ -27,13 +27,14 @@ import (
|
|||
)
|
||||
|
||||
// TODO: NewMinioChunkManager is deprecated. Rewrite this unittest.
|
||||
func newMinIOChunkManager(ctx context.Context, bucketName string) (*MinioChunkManager, error) {
|
||||
func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath string) (*MinioChunkManager, error) {
|
||||
endPoint, _ := Params.Load("_MinioAddress")
|
||||
accessKeyID, _ := Params.Load("minio.accessKeyID")
|
||||
secretAccessKey, _ := Params.Load("minio.secretAccessKey")
|
||||
useSSLStr, _ := Params.Load("minio.useSSL")
|
||||
useSSL, _ := strconv.ParseBool(useSSLStr)
|
||||
client, err := NewMinioChunkManager(ctx,
|
||||
RootPath(rootPath),
|
||||
Address(endPoint),
|
||||
AccessKeyID(accessKeyID),
|
||||
SecretAccessKeyID(secretAccessKey),
|
||||
|
@ -45,6 +46,7 @@ func newMinIOChunkManager(ctx context.Context, bucketName string) (*MinioChunkMa
|
|||
)
|
||||
return client, err
|
||||
}
|
||||
|
||||
func TestMinIOCMFail(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
endPoint, _ := Params.Load("9.9.9.9")
|
||||
|
@ -80,10 +82,12 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testLoadRoot)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testLoadRoot)
|
||||
|
||||
assert.Equal(t, testLoadRoot, testCM.RootPath())
|
||||
|
||||
prepareTests := []struct {
|
||||
key string
|
||||
value []byte
|
||||
|
@ -193,7 +197,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testMultiSaveRoot)
|
||||
assert.Nil(t, err)
|
||||
defer testCM.RemoveWithPrefix(testMultiSaveRoot)
|
||||
|
||||
|
@ -218,7 +222,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testRemoveRoot)
|
||||
assert.Nil(t, err)
|
||||
defer testCM.RemoveWithPrefix(testRemoveRoot)
|
||||
|
||||
|
@ -314,7 +318,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testLoadPartialRoot)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testLoadPartialRoot)
|
||||
|
||||
|
@ -362,7 +366,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testGetSizeRoot)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testGetSizeRoot)
|
||||
|
||||
|
@ -388,7 +392,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testGetPathRoot)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testGetPathRoot)
|
||||
|
||||
|
@ -414,7 +418,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testMmapRoot)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testMmapRoot)
|
||||
|
||||
|
@ -435,7 +439,7 @@ func TestMinIOCM(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket)
|
||||
testCM, err := newMinIOChunkManager(ctx, testBucket, testPrefix)
|
||||
require.NoError(t, err)
|
||||
defer testCM.RemoveWithPrefix(testPrefix)
|
||||
|
||||
|
@ -492,3 +496,44 @@ func TestMinIOCM(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMinioChunkManager_normalizeRootPath(t *testing.T) {
|
||||
type testCase struct {
|
||||
input string
|
||||
expected string
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
input: "files",
|
||||
expected: "files",
|
||||
},
|
||||
{
|
||||
input: "files/",
|
||||
expected: "files/",
|
||||
},
|
||||
{
|
||||
input: "/files",
|
||||
expected: "files",
|
||||
},
|
||||
{
|
||||
input: "//files",
|
||||
expected: "files",
|
||||
},
|
||||
{
|
||||
input: "files/my-folder",
|
||||
expected: "files/my-folder",
|
||||
},
|
||||
{
|
||||
input: "",
|
||||
expected: "",
|
||||
},
|
||||
}
|
||||
|
||||
mcm := &MinioChunkManager{}
|
||||
for _, test := range cases {
|
||||
t.Run(test.input, func(t *testing.T) {
|
||||
assert.Equal(t, test.expected, mcm.normalizeRootPath(test.input))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ type FileReader interface {
|
|||
// ChunkManager is to manager chunks.
|
||||
// Include Read, Write, Remove chunks.
|
||||
type ChunkManager interface {
|
||||
// RootPath returns current root path.
|
||||
RootPath() string
|
||||
// Path returns path of @filePath.
|
||||
Path(filePath string) (string, error)
|
||||
// Size returns path of @filePath.
|
||||
|
|
|
@ -116,7 +116,12 @@ func (vcm *VectorChunkManager) deserializeVectorFile(filePath string, content []
|
|||
return results, nil
|
||||
}
|
||||
|
||||
// GetPath returns the path of vector data. If cached, return local path.
|
||||
// RootPath returns vector root path
|
||||
func (vcm *VectorChunkManager) RootPath() string {
|
||||
return vcm.vectorStorage.RootPath()
|
||||
}
|
||||
|
||||
// Path returns the path of vector data. If cached, return local path.
|
||||
// If not cached return remote path.
|
||||
func (vcm *VectorChunkManager) Path(filePath string) (string, error) {
|
||||
return vcm.vectorStorage.Path(filePath)
|
||||
|
|
|
@ -123,7 +123,7 @@ func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorCh
|
|||
|
||||
bucketName := "vector-chunk-manager"
|
||||
|
||||
rcm, err := newMinIOChunkManager(ctx, bucketName)
|
||||
rcm, err := newMinIOChunkManager(ctx, bucketName, "")
|
||||
if err != nil {
|
||||
return nil, cancel, err
|
||||
}
|
||||
|
@ -155,13 +155,14 @@ func TestNewVectorChunkManager(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
bucketName := "vector-chunk-manager"
|
||||
|
||||
rcm, err := newMinIOChunkManager(ctx, bucketName)
|
||||
rcm, err := newMinIOChunkManager(ctx, bucketName, "")
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, rcm)
|
||||
lcm := NewLocalChunkManager(RootPath(localPath))
|
||||
|
||||
meta := initMeta()
|
||||
vcm, err := NewVectorChunkManager(lcm, rcm, meta, 16, true)
|
||||
assert.Equal(t, "", vcm.RootPath())
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, vcm)
|
||||
|
||||
|
|
|
@ -31,6 +31,10 @@ type MockChunkManager struct {
|
|||
size int64
|
||||
}
|
||||
|
||||
func (mc *MockChunkManager) RootPath() string {
|
||||
return TempFilesPath
|
||||
}
|
||||
|
||||
func (mc *MockChunkManager) Path(filePath string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ package paramtable
|
|||
import (
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -1104,10 +1103,8 @@ type dataNodeConfig struct {
|
|||
FlowGraphMaxQueueLength int32
|
||||
FlowGraphMaxParallelism int32
|
||||
FlushInsertBufferSize int64
|
||||
InsertBinlogRootPath string
|
||||
StatsBinlogRootPath string
|
||||
DeleteBinlogRootPath string
|
||||
Alias string // Different datanode in one machine
|
||||
|
||||
Alias string // Different datanode in one machine
|
||||
|
||||
// etcd
|
||||
ChannelWatchSubPath string
|
||||
|
@ -1125,9 +1122,6 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
|||
p.initFlowGraphMaxQueueLength()
|
||||
p.initFlowGraphMaxParallelism()
|
||||
p.initFlushInsertBufferSize()
|
||||
p.initInsertBinlogRootPath()
|
||||
p.initStatsBinlogRootPath()
|
||||
p.initDeleteBinlogRootPath()
|
||||
p.initIOConcurrency()
|
||||
|
||||
p.initChannelWatchPath()
|
||||
|
@ -1150,31 +1144,6 @@ func (p *dataNodeConfig) initFlushInsertBufferSize() {
|
|||
p.FlushInsertBufferSize = p.Base.ParseInt64("_DATANODE_INSERTBUFSIZE")
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) initInsertBinlogRootPath() {
|
||||
// GOOSE TODO: rootPath change to TenentID
|
||||
rootPath, err := p.Base.Load("minio.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.InsertBinlogRootPath = path.Join(rootPath, "insert_log")
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) initStatsBinlogRootPath() {
|
||||
rootPath, err := p.Base.Load("minio.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.StatsBinlogRootPath = path.Join(rootPath, "stats_log")
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) initDeleteBinlogRootPath() {
|
||||
rootPath, err := p.Base.Load("minio.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log")
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) initChannelWatchPath() {
|
||||
p.ChannelWatchSubPath = "channelwatch"
|
||||
}
|
||||
|
@ -1203,7 +1172,7 @@ type indexCoordConfig struct {
|
|||
Address string
|
||||
Port int
|
||||
|
||||
IndexStorageRootPath string
|
||||
MinSegmentNumRowsToEnableIndex int64
|
||||
|
||||
GCInterval time.Duration
|
||||
|
||||
|
@ -1214,17 +1183,11 @@ type indexCoordConfig struct {
|
|||
func (p *indexCoordConfig) init(base *BaseTable) {
|
||||
p.Base = base
|
||||
|
||||
p.initIndexStorageRootPath()
|
||||
p.initGCInterval()
|
||||
}
|
||||
|
||||
// initIndexStorageRootPath initializes the root path of index files.
|
||||
func (p *indexCoordConfig) initIndexStorageRootPath() {
|
||||
rootPath, err := p.Base.Load("minio.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.IndexStorageRootPath = path.Join(rootPath, "index_files")
|
||||
func (p *indexCoordConfig) initMinSegmentNumRowsToEnableIndex() {
|
||||
p.MinSegmentNumRowsToEnableIndex = p.Base.ParseInt64WithDefault("indexCoord.minSegmentNumRowsToEnableIndex", 1024)
|
||||
}
|
||||
|
||||
func (p *indexCoordConfig) initGCInterval() {
|
||||
|
@ -1244,8 +1207,7 @@ type indexNodeConfig struct {
|
|||
|
||||
Alias string
|
||||
|
||||
IndexStorageRootPath string
|
||||
BuildParallel int
|
||||
BuildParallel int
|
||||
|
||||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
@ -1254,7 +1216,6 @@ type indexNodeConfig struct {
|
|||
func (p *indexNodeConfig) init(base *BaseTable) {
|
||||
p.Base = base
|
||||
p.NodeID.Store(UniqueID(0))
|
||||
p.initIndexStorageRootPath()
|
||||
p.initBuildParallel()
|
||||
}
|
||||
|
||||
|
@ -1263,14 +1224,6 @@ func (p *indexNodeConfig) InitAlias(alias string) {
|
|||
p.Alias = alias
|
||||
}
|
||||
|
||||
func (p *indexNodeConfig) initIndexStorageRootPath() {
|
||||
rootPath, err := p.Base.Load("minio.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.IndexStorageRootPath = path.Join(rootPath, "index_files")
|
||||
}
|
||||
|
||||
func (p *indexNodeConfig) initBuildParallel() {
|
||||
p.BuildParallel = p.Base.ParseIntWithDefault("indexNode.scheduler.buildParallel", 1)
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ package paramtable
|
|||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -305,18 +304,11 @@ func TestComponentParam(t *testing.T) {
|
|||
size := Params.FlushInsertBufferSize
|
||||
t.Logf("FlushInsertBufferSize: %d", size)
|
||||
|
||||
path1 := Params.InsertBinlogRootPath
|
||||
t.Logf("InsertBinlogRootPath: %s", path1)
|
||||
|
||||
Params.CreatedTime = time.Now()
|
||||
t.Logf("CreatedTime: %v", Params.CreatedTime)
|
||||
|
||||
Params.UpdatedTime = time.Now()
|
||||
t.Logf("UpdatedTime: %v", Params.UpdatedTime)
|
||||
|
||||
assert.Equal(t, path.Join("files", "insert_log"), Params.InsertBinlogRootPath)
|
||||
|
||||
assert.Equal(t, path.Join("files", "stats_log"), Params.StatsBinlogRootPath)
|
||||
})
|
||||
|
||||
t.Run("test indexCoordConfig", func(t *testing.T) {
|
||||
|
@ -331,8 +323,6 @@ func TestComponentParam(t *testing.T) {
|
|||
|
||||
Params.UpdatedTime = time.Now()
|
||||
t.Logf("UpdatedTime: %v", Params.UpdatedTime)
|
||||
|
||||
t.Logf("IndexStorageRootPath: %v", Params.IndexStorageRootPath)
|
||||
})
|
||||
|
||||
t.Run("test indexNodeConfig", func(t *testing.T) {
|
||||
|
@ -353,7 +343,5 @@ func TestComponentParam(t *testing.T) {
|
|||
|
||||
Params.UpdatedTime = time.Now()
|
||||
t.Logf("UpdatedTime: %v", Params.UpdatedTime)
|
||||
|
||||
t.Logf("IndexStorageRootPath: %v", Params.IndexStorageRootPath)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue