Replace minio kv with minio chunk manager (#15936)

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
pull/16096/head
godchen 2022-03-17 18:03:23 +08:00 committed by GitHub
parent 3cd28420f1
commit 78557ca6d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 396 additions and 1273 deletions

View File

@ -17,7 +17,6 @@
package datanode
import (
"bytes"
"context"
"errors"
"path"
@ -25,7 +24,6 @@ import (
"time"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -57,7 +55,7 @@ type uploader interface {
}
type binlogIO struct {
kv.BaseKV
storage.ChunkManager
allocatorInterface
}
@ -67,7 +65,7 @@ var _ uploader = (*binlogIO)(nil)
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
var (
err = errStart
vs = []string{}
vs [][]byte
)
g, gCtx := errgroup.WithContext(ctx)
@ -84,7 +82,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths))
<-time.After(50 * time.Millisecond)
}
vs, err = b.MultiLoad(paths)
vs, err = b.MultiRead(paths)
}
}
return nil
@ -96,7 +94,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
rst := make([]*Blob, len(vs))
for i := range rst {
rst[i] = &Blob{Value: bytes.NewBufferString(vs[i]).Bytes()}
rst[i] = &Blob{Value: vs[i]}
}
return rst, nil
@ -117,7 +115,7 @@ func (b *binlogIO) upload(
var (
p = &segPaths{} // The returns
kvs = make(map[string]string) // Key values to store in minIO
kvs = make(map[string][]byte) // Key values to store in minIO
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog
statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog
@ -186,7 +184,7 @@ func (b *binlogIO) upload(
return nil, err
}
kvs[k] = bytes.NewBuffer(v).String()
kvs[k] = v
p.deltaInfo = append(p.deltaInfo, &datapb.FieldBinlog{
FieldID: 0, // TODO: Not useful on deltalogs, FieldID shall be ID of primary key field
Binlogs: []*datapb.Binlog{{
@ -213,7 +211,7 @@ func (b *binlogIO) upload(
zap.Int64("segmentID", segID))
<-time.After(50 * time.Millisecond)
}
err = b.MultiSave(kvs)
err = b.MultiWrite(kvs)
}
}
return p, nil
@ -239,7 +237,7 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
}
// genInsertBlobs returns kvs, insert-paths, stats-paths
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string]string, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string][]byte, map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
inCodec := storage.NewInsertCodec(meta)
inlogs, statslogs, err := inCodec.Serialize(partID, segID, data)
if err != nil {
@ -247,7 +245,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
var (
kvs = make(map[string]string, len(inlogs)+len(statslogs))
kvs = make(map[string][]byte, len(inlogs)+len(statslogs))
inpaths = make(map[UniqueID]*datapb.FieldBinlog)
statspaths = make(map[UniqueID]*datapb.FieldBinlog)
)
@ -266,7 +264,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.DataNodeCfg.InsertBinlogRootPath, k)
value := bytes.NewBuffer(blob.GetValue()).String()
value := blob.GetValue()
fileLen := len(value)
kvs[key] = value
@ -283,7 +281,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.DataNodeCfg.StatsBinlogRootPath, k)
value := bytes.NewBuffer(blob.GetValue()).String()
value := blob.GetValue()
fileLen := len(value)
kvs[key] = value
@ -318,7 +316,3 @@ func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, e
return rt, nil
}
func (b *binlogIO) close() {
b.Close()
}

View File

@ -24,8 +24,6 @@ import (
"time"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
@ -33,11 +31,14 @@ import (
"go.uber.org/zap"
)
var binlogTestDir = "/tmp/milvus_test/test_binlog_io"
func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc := NewAllocatorFactory()
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix("")
b := &binlogIO{kv, alloc}
b := &binlogIO{cm, alloc}
t.Run("Test upload", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "uploads")
@ -105,8 +106,8 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.Error(t, err)
assert.Empty(t, p)
mkv := &mockKv{errMultiSave: true}
bin := &binlogIO{mkv, alloc}
mkc := &mockCm{errMultiSave: true}
bin := &binlogIO{mkc, alloc}
iData = genInsertData()
dData = &DeleteData{
Pks: []int64{1},
@ -138,7 +139,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
if test.isvalid {
inkeys := []string{}
for _, k := range test.ks {
blob, key, err := prepareBlob(kv, k)
blob, key, err := prepareBlob(cm, k)
require.NoError(t, err)
assert.NotEmpty(t, blob)
inkeys = append(inkeys, key)
@ -163,8 +164,8 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
})
t.Run("Test download twice", func(t *testing.T) {
mkv := &mockKv{errMultiLoad: true}
b := &binlogIO{mkv, alloc}
mkc := &mockCm{errMultiLoad: true}
b := &binlogIO{mkc, alloc}
ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20)
blobs, err := b.download(ctx, []string{"a"})
@ -174,11 +175,11 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
})
}
func prepareBlob(kv kv.BaseKV, key string) ([]byte, string, error) {
func prepareBlob(cm storage.ChunkManager, key string) ([]byte, string, error) {
k := path.Join("test_prepare_blob", key)
blob := []byte{1, 2, 3, 255, 188}
err := kv.Save(k, string(blob[:]))
err := cm.Write(k, blob[:])
if err != nil {
return nil, "", err
}
@ -187,8 +188,10 @@ func prepareBlob(kv kv.BaseKV, key string) ([]byte, string, error) {
func TestBinlogIOInnerMethods(t *testing.T) {
alloc := NewAllocatorFactory()
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix("")
b := &binlogIO{
memkv.NewMemoryKV(),
cm,
alloc,
}
@ -234,7 +237,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
errAlloc := NewAllocatorFactory()
errAlloc.isvalid = false
bin := binlogIO{memkv.NewMemoryKV(), errAlloc}
bin := binlogIO{cm, errAlloc}
k, v, err = bin.genDeltaBlobs(&DeleteData{Pks: []int64{1}, Tss: []uint64{1}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
@ -276,7 +279,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
errAlloc := NewAllocatorFactory()
errAlloc.errAllocBatch = true
bin := &binlogIO{memkv.NewMemoryKV(), errAlloc}
bin := &binlogIO{cm, errAlloc}
kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
assert.Error(t, err)
@ -332,36 +335,45 @@ func TestBinlogIOInnerMethods(t *testing.T) {
}
})
}
b.close()
})
}
type mockKv struct {
type mockCm struct {
storage.ChunkManager
errMultiLoad bool
errMultiSave bool
}
var _ kv.BaseKV = (*mockKv)(nil)
var _ storage.ChunkManager = (*mockCm)(nil)
func (mk *mockKv) Load(key string) (string, error) { return "", nil }
func (mk *mockKv) MultiLoad(keys []string) ([]string, error) {
if mk.errMultiLoad {
return []string{}, errors.New("mockKv multiload error")
}
return []string{"a"}, nil
func (mk *mockCm) Write(filePath string, content []byte) error {
return nil
}
func (mk *mockKv) LoadWithPrefix(key string) ([]string, []string, error) { return nil, nil, nil }
func (mk *mockKv) Save(key, value string) error { return nil }
func (mk *mockKv) MultiSave(kvs map[string]string) error {
func (mk *mockCm) MultiWrite(contents map[string][]byte) error {
if mk.errMultiSave {
return errors.New("mockKv multisave error")
}
return nil
}
func (mk *mockKv) Remove(key string) error { return nil }
func (mk *mockKv) MultiRemove(keys []string) error { return nil }
func (mk *mockKv) RemoveWithPrefix(key string) error { return nil }
func (mk *mockKv) Close() {}
func (mk *mockCm) Read(filePath string) ([]byte, error) {
return nil, nil
}
func (mk *mockCm) MultiRead(filePaths []string) ([][]byte, error) {
if mk.errMultiLoad {
return nil, errors.New("mockKv multiload error")
}
return [][]byte{[]byte("a")}, nil
}
func (mk *mockCm) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
return nil, nil, nil
}
func (mk *mockCm) Remove(key string) error { return nil }
func (mk *mockCm) MultiRemove(keys []string) error { return nil }
func (mk *mockCm) RemoveWithPrefix(key string) error { return nil }
func (mk *mockCm) Close() {}

View File

@ -32,10 +32,14 @@ import (
"github.com/stretchr/testify/require"
)
var compactTestDir = "/tmp/milvus_test/compact"
func TestCompactionTaskInnerMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix("")
t.Run("Test getSegmentMeta", func(t *testing.T) {
rc := &RootCoordFactory{}
replica, err := newReplica(context.TODO(), rc, 1)
replica, err := newReplica(context.TODO(), rc, cm, 1)
require.NoError(t, err)
task := &compactionTask{
@ -394,6 +398,8 @@ func getInsertBlobs(segID UniqueID, iData *InsertData, meta *etcdpb.CollectionMe
}
func TestCompactorInterfaceMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix("")
notEmptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{{
SegmentID: 100,
FieldBinlogs: nil,
@ -445,9 +451,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
rc := &RootCoordFactory{}
dc := &DataCoordFactory{}
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
mockbIO := &binlogIO{cm, alloc}
replica, err := newReplica(context.TODO(), rc, cm, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2})
@ -507,7 +512,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 2,
}
err = mockKv.RemoveWithPrefix("/")
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, deleteAllData, meta)
require.NoError(t, err)
@ -522,7 +527,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2})
// Compact empty segment
err = mockKv.RemoveWithPrefix("/")
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
@ -539,7 +544,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
plan.SegmentBinlogs = segBinlogs
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
@ -555,7 +560,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// New test, remove all the binlogs in memkv
// Timeout
err = mockKv.RemoveWithPrefix("/")
err = cm.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
@ -574,8 +579,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
dc := &DataCoordFactory{}
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
mockbIO := &binlogIO{cm, alloc}
replica, err := newReplica(context.TODO(), rc, cm, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
@ -699,9 +704,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
rc := &RootCoordFactory{}
dc := &DataCoordFactory{}
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
mockbIO := &binlogIO{cm, alloc}
replica, err := newReplica(context.TODO(), rc, cm, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})

View File

@ -38,7 +38,6 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -47,6 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
@ -111,9 +111,9 @@ type DataNode struct {
rootCoord types.RootCoord
dataCoord types.DataCoord
session *sessionutil.Session
watchKv kv.MetaKv
blobKv kv.BaseKV
session *sessionutil.Session
watchKv kv.MetaKv
chunkManager storage.ChunkManager
closer io.Closer
@ -472,21 +472,19 @@ func (node *DataNode) Start() error {
return errors.New("DataNode fail to connect etcd")
}
option := &miniokv.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
chunkManager, err := storage.NewMinioChunkManager(node.ctx,
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.CreateBucket(true))
kv, err := miniokv.NewMinIOKV(node.ctx, option)
if err != nil {
return err
}
node.blobKv = kv
node.chunkManager = chunkManager
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
return errors.New("DataNode fail to start")
@ -767,7 +765,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return status, nil
}
binlogIO := &binlogIO{node.blobKv, ds.idAllocator}
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
task := newCompactionTask(
node.ctx,
binlogIO, binlogIO,

View File

@ -21,11 +21,11 @@ import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/zap"
@ -47,7 +47,7 @@ type dataSyncService struct {
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
blobKV kv.BaseKV
chunkManager storage.ChunkManager
compactor *compactionExecutor // reference to compaction executor
}
@ -60,7 +60,7 @@ func newDataSyncService(ctx context.Context,
clearSignal chan<- string,
dataCoord types.DataCoord,
flushingSegCache *Cache,
blobKV kv.BaseKV,
chunkManager storage.ChunkManager,
compactor *compactionExecutor,
) (*dataSyncService, error) {
@ -83,7 +83,7 @@ func newDataSyncService(ctx context.Context,
dataCoord: dataCoord,
clearSignal: clearSignal,
flushingSegCache: flushingSegCache,
blobKV: blobKV,
chunkManager: chunkManager,
compactor: compactor,
}
@ -142,7 +142,7 @@ func (dsService *dataSyncService) close() {
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
// initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica,
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.replica,
flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService))
// recover segment checkpoints

View File

@ -24,11 +24,11 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -36,6 +36,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service"
func getVchanInfo(info *testInfo) *datapb.VchannelInfo {
var ufs []*datapb.SegmentInfo
var fs []*datapb.SegmentInfo
@ -127,12 +129,14 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
0, 0, "", 0,
"replica nil"},
}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix("")
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
df := &DataCoordFactory{}
replica, err := newReplica(context.Background(), &RootCoordFactory{}, test.collID)
replica, err := newReplica(context.Background(), &RootCoordFactory{}, cm, test.collID)
assert.Nil(t, err)
if test.replicaNil {
replica = nil
@ -147,7 +151,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
make(chan string),
df,
newCache(),
memkv.NewMemoryKV(),
cm,
newCompactionExecutor(),
)
@ -185,7 +189,9 @@ func TestDataSyncService_Start(t *testing.T) {
collectionID := UniqueID(1)
flushChan := make(chan flushMsg, 100)
replica, err := newReplica(context.Background(), mockRootCoord, collectionID)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(context.Background(), mockRootCoord, cm, collectionID)
assert.Nil(t, err)
allocFactory := NewAllocatorFactory(1)
@ -225,7 +231,7 @@ func TestDataSyncService_Start(t *testing.T) {
}
signalCh := make(chan string, 100)
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV(), newCompactionExecutor())
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), cm, newCompactionExecutor())
assert.Nil(t, err)
// sync.replica.addCollection(collMeta.ID, collMeta.Schema)

View File

@ -24,13 +24,15 @@ import (
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/stretchr/testify/assert"
)
var deleteNodeTestDir = "/tmp/milvus_test/deleteNode"
type mockReplica struct {
Replica
@ -214,8 +216,9 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
pks = []int64{3, 17, 44, 190, 425}
)
replica := genMockReplica(segIDs, pks, chanName)
kv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix("")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
t.Run("Test get segment by primary keys", func(te *testing.T) {
c := &nodeConfig{
replica: replica,

View File

@ -23,10 +23,10 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -38,6 +38,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
var insertNodeTestDir = "/tmp/milvus_test/insert_node"
// CDFMsFactory count down fails msg factory
type CDFMsFactory struct {
msgstream.Factory
@ -56,6 +58,8 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-create"
testPath := "/test/datanode/root/meta"
@ -67,7 +71,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
mockRootCoord := &RootCoordFactory{}
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
replica, err := newReplica(ctx, mockRootCoord, cm, collMeta.ID)
assert.Nil(t, err)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
@ -81,9 +85,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
@ -148,6 +150,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate"
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
@ -157,7 +161,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
mockRootCoord := &RootCoordFactory{}
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
replica, err := newReplica(ctx, mockRootCoord, cm, collMeta.ID)
assert.Nil(t, err)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
@ -171,9 +175,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -379,10 +381,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
flushPacks := []*segmentFlushPack{}
fpMut := sync.Mutex{}
memkv := memkv.NewMemoryKV()
wg := sync.WaitGroup{}
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, colRep, func(pack *segmentFlushPack) {
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, colRep, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
@ -637,7 +640,9 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
compactTs: 100,
}
replica, err := newReplica(ctx, mockRootCoord, collMeta.ID)
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(ctx, mockRootCoord, cm, collMeta.ID)
assert.Nil(t, err)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
@ -651,9 +656,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -681,6 +684,8 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
}
func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix("")
invalideTests := []struct {
replicaCollID UniqueID
@ -692,7 +697,7 @@ func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) {
}
for _, test := range invalideTests {
replica, err := newReplica(context.Background(), &RootCoordFactory{}, test.replicaCollID)
replica, err := newReplica(context.Background(), &RootCoordFactory{}, cm, test.replicaCollID)
assert.Nil(te, err)
ibNode := &insertBufferNode{

View File

@ -47,7 +47,7 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo
return nil
}
replica, err := newReplica(dn.ctx, dn.rootCoord, vchan.GetCollectionID())
replica, err := newReplica(dn.ctx, dn.rootCoord, dn.chunkManager, vchan.GetCollectionID())
if err != nil {
log.Warn("new replica failed", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err))
return err
@ -55,7 +55,7 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo
var alloc allocatorInterface = newAllocator(dn.rootCoord)
dataSyncService, err := newDataSyncService(dn.ctx, make(chan flushMsg, 100), replica, alloc, dn.msFactory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.blobKv, dn.compactionExecutor)
dataSyncService, err := newDataSyncService(dn.ctx, make(chan flushMsg, 100), replica, alloc, dn.msFactory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.chunkManager, dn.compactionExecutor)
if err != nil {
log.Warn("new data sync service fail", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err))
return err

View File

@ -23,7 +23,6 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -253,7 +252,7 @@ type dropHandler struct {
// rendezvousFlushManager makes sure insert & del buf all flushed
type rendezvousFlushManager struct {
allocatorInterface
kv.BaseKV
storage.ChunkManager
Replica
// segment id => flush queue
@ -356,7 +355,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
}
field2Insert := make(map[UniqueID]*datapb.Binlog, len(binLogs))
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))
for idx, blob := range binLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
@ -371,7 +370,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key := path.Join(Params.DataNodeCfg.InsertBinlogRootPath, k)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
field2Insert[fieldID] = &datapb.Binlog{
EntriesNum: data.size,
TimestampFrom: 0, //TODO
@ -397,7 +396,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
key := path.Join(Params.DataNodeCfg.StatsBinlogRootPath, k)
kvs[key] = string(blob.Value)
kvs[key] = blob.Value
field2Stats[fieldID] = &datapb.Binlog{
EntriesNum: 0,
TimestampFrom: 0, //TODO
@ -409,8 +408,8 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
m.updateSegmentCheckPoint(segmentID)
m.handleInsertTask(segmentID, &flushBufferInsertTask{
BaseKV: m.BaseKV,
data: kvs,
ChunkManager: m.ChunkManager,
data: kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)
metrics.DataNodeFlushSegmentLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -447,13 +446,13 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
blobKey := JoinIDPath(collID, partID, segmentID, logID)
blobPath := path.Join(Params.DataNodeCfg.DeleteBinlogRootPath, blobKey)
kvs := map[string]string{blobPath: string(blob.Value[:])}
kvs := map[string][]byte{blobPath: blob.Value[:]}
data.LogSize = int64(len(blob.Value))
data.LogPath = blobPath
log.Info("delete blob path", zap.String("path", blobPath))
m.handleDeleteTask(segmentID, &flushBufferDeleteTask{
BaseKV: m.BaseKV,
data: kvs,
ChunkManager: m.ChunkManager,
data: kvs,
}, data, pos)
return nil
}
@ -547,18 +546,18 @@ func (m *rendezvousFlushManager) close() {
}
type flushBufferInsertTask struct {
kv.BaseKV
data map[string]string
storage.ChunkManager
data map[string][]byte
}
// flushInsertData implements flushInsertTask
func (t *flushBufferInsertTask) flushInsertData() error {
if t.BaseKV != nil && len(t.data) > 0 {
if t.ChunkManager != nil && len(t.data) > 0 {
for _, d := range t.data {
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID), metrics.InsertLabel).Add(float64(len(d)))
}
tr := timerecord.NewTimeRecorder("insertData")
err := t.MultiSave(t.data)
err := t.MultiWrite(t.data)
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return err
}
@ -566,18 +565,18 @@ func (t *flushBufferInsertTask) flushInsertData() error {
}
type flushBufferDeleteTask struct {
kv.BaseKV
data map[string]string
storage.ChunkManager
data map[string][]byte
}
// flushDeleteData implements flushDeleteTask
func (t *flushBufferDeleteTask) flushDeleteData() error {
if len(t.data) > 0 && t.BaseKV != nil {
if len(t.data) > 0 && t.ChunkManager != nil {
for _, d := range t.data {
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID), metrics.DeleteLabel).Add(float64(len(d)))
}
tr := timerecord.NewTimeRecorder("deleteData")
err := t.MultiSave(t.data)
err := t.MultiWrite(t.data)
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
return err
}
@ -585,10 +584,10 @@ func (t *flushBufferDeleteTask) flushDeleteData() error {
}
// NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv
func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager {
func NewRendezvousFlushManager(allocator allocatorInterface, cm storage.ChunkManager, replica Replica, f notifyMetaFunc, drop flushAndDropFunc) *rendezvousFlushManager {
fm := &rendezvousFlushManager{
allocatorInterface: allocator,
BaseKV: kv,
ChunkManager: cm,
notifyFunc: f,
Replica: replica,
dropHandler: dropHandler{

View File

@ -24,15 +24,17 @@ import (
"testing"
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
var flushTestDir = "/tmp/milvus_test/flush"
type emptyFlushTask struct{}
func (t *emptyFlushTask) flushInsertData() error {
@ -137,13 +139,14 @@ func TestOrderFlushQueue_Order(t *testing.T) {
}
func TestRendezvousFlushManager(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix("")
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
@ -173,7 +176,8 @@ func TestRendezvousFlushManager(t *testing.T) {
}
func TestRendezvousFlushManager_Inject(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix("")
size := 1000
var counter atomic.Int64
@ -181,7 +185,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish.Add(size)
var packMut sync.Mutex
packs := make([]*segmentFlushPack, 0, size+3)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
packMut.Lock()
packs = append(packs, pack)
packMut.Unlock()
@ -272,9 +276,9 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
}
func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
memkv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
replica := newMockReplica()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {
}, emptyFlushAndDropFunc)
// non exists segment
@ -293,13 +297,13 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
}
func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
size := 1000
var counter atomic.Int64
var finish sync.WaitGroup
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
@ -363,13 +367,13 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
func TestRendezvousFlushManager_dropMode(t *testing.T) {
t.Run("test drop mode", func(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
@ -415,13 +419,13 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
assert.Equal(t, len(target), len(output))
})
t.Run("test drop mode with injection", func(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
@ -474,13 +478,13 @@ func TestRendezvousFlushManager_dropMode(t *testing.T) {
}
func TestRendezvousFlushManager_close(t *testing.T) {
kv := memkv.NewMemoryKV()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, cm, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
}, emptyFlushAndDropFunc)
@ -513,8 +517,9 @@ func TestRendezvousFlushManager_close(t *testing.T) {
func TestFlushNotifyFunc(t *testing.T) {
ctx := context.Background()
rcf := &RootCoordFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
replica, err := newReplica(ctx, rcf, 1)
replica, err := newReplica(ctx, rcf, cm, 1)
require.NoError(t, err)
dataCoord := &DataCoordFactory{}
@ -564,7 +569,8 @@ func TestFlushNotifyFunc(t *testing.T) {
func TestDropVirtualChannelFunc(t *testing.T) {
ctx := context.Background()
rcf := &RootCoordFactory{}
replica, err := newReplica(ctx, rcf, 1)
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
replica, err := newReplica(ctx, rcf, cm, 1)
require.NoError(t, err)
dataCoord := &DataCoordFactory{}

View File

@ -27,8 +27,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -105,8 +103,8 @@ type SegmentReplica struct {
flushedSegments map[UniqueID]*Segment
compactedSegments map[UniqueID]*Segment
metaService *metaService
minIOKV kv.BaseKV
metaService *metaService
chunkManager storage.ChunkManager
}
func (s *Segment) updatePKRange(pks []int64) {
@ -130,22 +128,7 @@ func (s *Segment) updatePKRange(pks []int64) {
var _ Replica = &SegmentReplica{}
func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*SegmentReplica, error) {
// MinIO
option := &miniokv.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
return nil, err
}
func newReplica(ctx context.Context, rc types.RootCoord, cm storage.ChunkManager, collID UniqueID) (*SegmentReplica, error) {
metaService := newMetaService(rc, collID)
replica := &SegmentReplica{
@ -156,8 +139,8 @@ func newReplica(ctx context.Context, rc types.RootCoord, collID UniqueID) (*Segm
flushedSegments: make(map[UniqueID]*Segment),
compactedSegments: make(map[UniqueID]*Segment),
metaService: metaService,
minIOKV: minIOKV,
metaService: metaService,
chunkManager: cm,
}
return replica, nil
@ -441,14 +424,14 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat
}
}
values, err := replica.minIOKV.MultiLoad(bloomFilterFiles)
values, err := replica.chunkManager.MultiRead(bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
return err
}
blobs := make([]*Blob, 0)
for i := 0; i < len(values); i++ {
blobs = append(blobs, &Blob{Value: []byte(values[i])})
blobs = append(blobs, &Blob{Value: values[i]})
}
stats, err := storage.DeserializeStats(blobs)

View File

@ -29,24 +29,27 @@ import (
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
)
var segmentReplicaNodeTestDir = "/tmp/milvus_test/segment_replica"
func TestNewReplica(t *testing.T) {
rc := &RootCoordFactory{}
replica, err := newReplica(context.Background(), rc, 0)
cm := storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(context.Background(), rc, cm, 0)
assert.Nil(t, err)
assert.NotNil(t, replica)
}
type mockMinioKV struct {
kv.BaseKV
type mockDataCM struct {
storage.ChunkManager
}
func (kv *mockMinioKV) MultiLoad(keys []string) ([]string, error) {
func (kv *mockDataCM) MultiRead(keys []string) ([][]byte, error) {
stats := &storage.Int64Stats{
FieldID: common.RowIDField,
Min: 0,
@ -54,14 +57,14 @@ func (kv *mockMinioKV) MultiLoad(keys []string) ([]string, error) {
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
}
buffer, _ := json.Marshal(stats)
return []string{string(buffer)}, nil
return [][]byte{buffer}, nil
}
type mockPkfilterMergeError struct {
kv.BaseKV
storage.ChunkManager
}
func (kv *mockPkfilterMergeError) MultiLoad(keys []string) ([]string, error) {
func (kv *mockPkfilterMergeError) MultiRead(keys []string) ([][]byte, error) {
stats := &storage.Int64Stats{
FieldID: common.RowIDField,
Min: 0,
@ -69,23 +72,23 @@ func (kv *mockPkfilterMergeError) MultiLoad(keys []string) ([]string, error) {
BF: bloom.NewWithEstimates(1, 0.0001),
}
buffer, _ := json.Marshal(stats)
return []string{string(buffer)}, nil
return [][]byte{buffer}, nil
}
type mockMinioKVError struct {
kv.BaseKV
type mockDataCMError struct {
storage.ChunkManager
}
func (kv *mockMinioKVError) MultiLoad(keys []string) ([]string, error) {
func (kv *mockDataCMError) MultiRead(keys []string) ([][]byte, error) {
return nil, fmt.Errorf("mock error")
}
type mockMinioKVStatsError struct {
kv.BaseKV
type mockDataCMStatsError struct {
storage.ChunkManager
}
func (kv *mockMinioKVStatsError) MultiLoad(keys []string) ([]string, error) {
return []string{"3123123,error,test"}, nil
func (kv *mockDataCMStatsError) MultiRead(keys []string) ([][]byte, error) {
return [][]byte{[]byte("3123123,error,test")}, nil
}
func getSimpleFieldBinlog() *datapb.FieldBinlog {
@ -169,9 +172,11 @@ func TestSegmentReplica_getCollectionAndPartitionID(te *testing.T) {
func TestSegmentReplica(t *testing.T) {
rc := &RootCoordFactory{}
collID := UniqueID(1)
cm := storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
defer cm.RemoveWithPrefix("")
t.Run("Test coll mot match", func(t *testing.T) {
replica, err := newReplica(context.Background(), rc, collID)
replica, err := newReplica(context.Background(), rc, cm, collID)
assert.Nil(t, err)
err = replica.addNewSegment(1, collID+1, 0, "", nil, nil)
@ -248,6 +253,8 @@ func TestSegmentReplica(t *testing.T) {
func TestSegmentReplica_InterfaceMethod(t *testing.T) {
rc := &RootCoordFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
defer cm.RemoveWithPrefix("")
t.Run("Test addFlushedSegmentWithPKs", func(t *testing.T) {
tests := []struct {
@ -263,7 +270,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
replica, err := newReplica(context.TODO(), rc, test.replicaCollID)
replica, err := newReplica(context.TODO(), rc, cm, test.replicaCollID)
require.NoError(t, err)
if test.isvalid {
replica.addFlushedSegmentWithPKs(100, test.incollID, 10, "a", 1, []int64{9})
@ -300,7 +307,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
sr, err := newReplica(context.Background(), rc, cm, test.replicaCollID)
assert.Nil(t, err)
require.False(t, sr.hasSegment(test.inSegID, true))
err = sr.addNewSegment(test.inSegID,
@ -336,8 +343,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
sr.minIOKV = &mockMinioKV{}
sr, err := newReplica(context.Background(), rc, &mockDataCM{}, test.replicaCollID)
assert.Nil(t, err)
require.False(t, sr.hasSegment(test.inSegID, true))
err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}, 0)
@ -355,9 +361,8 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addNormalSegmentWithNilDml", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
sr, err := newReplica(context.Background(), rc, &mockDataCM{}, 1)
require.NoError(t, err)
sr.minIOKV = &mockMinioKV{}
segID := int64(101)
require.False(t, sr.hasSegment(segID, true))
assert.NotPanics(t, func() {
@ -548,7 +553,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, test.replicaCollID)
sr, err := newReplica(context.Background(), rc, cm, test.replicaCollID)
assert.Nil(t, err)
if test.metaServiceErr {
@ -581,9 +586,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
sr, err := newReplica(context.Background(), rc, cm, 1)
assert.Nil(t, err)
sr.minIOKV = &mockMinioKVError{}
sr.chunkManager = &mockDataCMError{}
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
@ -594,9 +599,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentStatsError", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
sr, err := newReplica(context.Background(), rc, cm, 1)
assert.Nil(t, err)
sr.minIOKV = &mockMinioKVStatsError{}
sr.chunkManager = &mockDataCMStatsError{}
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
@ -607,9 +612,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentPkfilterError", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
sr, err := newReplica(context.Background(), rc, cm, 1)
assert.Nil(t, err)
sr.minIOKV = &mockPkfilterMergeError{}
sr.chunkManager = &mockPkfilterMergeError{}
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
@ -620,7 +625,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
})
t.Run("Test_mergeFlushedSegments", func(t *testing.T) {
sr, err := newReplica(context.Background(), rc, 1)
sr, err := newReplica(context.Background(), rc, cm, 1)
assert.Nil(t, err)
sr.addFlushedSegmentWithPKs(1, 1, 0, "channel", 10, []UniqueID{1})
@ -645,9 +650,11 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
func TestInnerFunctionSegment(t *testing.T) {
rc := &RootCoordFactory{}
collID := UniqueID(1)
replica, err := newReplica(context.Background(), rc, collID)
cm := storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(context.Background(), rc, cm, collID)
assert.Nil(t, err)
replica.minIOKV = &mockMinioKV{}
replica.chunkManager = &mockDataCM{}
assert.False(t, replica.hasSegment(0, true))
assert.False(t, replica.hasSegment(0, false))
@ -770,9 +777,11 @@ func TestReplica_UpdatePKRange(t *testing.T) {
cpPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
replica, err := newReplica(context.Background(), rc, collID)
cm := storage.NewLocalChunkManager(storage.RootPath(segmentReplicaNodeTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(context.Background(), rc, cm, collID)
assert.Nil(t, err)
replica.minIOKV = &mockMinioKV{}
replica.chunkManager = &mockDataCM{}
err = replica.addNewSegment(1, collID, partID, chanName, startPos, endPos)
assert.Nil(t, err)

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/storage"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.uber.org/zap"
@ -36,9 +37,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -83,8 +82,8 @@ type IndexCoord struct {
idAllocator *allocator.GlobalIDAllocator
etcdCli *clientv3.Client
kv kv.BaseKV
etcdCli *clientv3.Client
chunkManager storage.ChunkManager
metaTable *metaTable
nodeManager *NodeManager
@ -222,24 +221,23 @@ func (i *IndexCoord) Init() error {
return
}
option := &miniokv.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
chunkManager, err := storage.NewMinioChunkManager(i.loopCtx,
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.CreateBucket(true))
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Error("IndexCoord new minio kv failed", zap.Error(err))
log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
initErr = err
return
}
log.Debug("IndexCoord new minio kv success")
log.Debug("IndexCoord new minio chunkManager success")
i.chunkManager = chunkManager
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.kv, i.metaTable)
i.sched, err = NewTaskScheduler(i.loopCtx, i.idAllocator, i.chunkManager, i.metaTable)
if err != nil {
log.Error("IndexCoord new task scheduler failed", zap.Error(err))
initErr = err
@ -722,7 +720,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
unusedIndexFilePathPrefix := Params.IndexCoordCfg.IndexStorageRootPath + "/" + strconv.Itoa(int(meta.indexMeta.IndexBuildID))
log.Debug("IndexCoord recycleUnusedIndexFiles",
zap.Int64("Recycle the index files for deleted index with indexBuildID", meta.indexMeta.IndexBuildID))
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
if err := i.chunkManager.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Error("IndexCoord recycleUnusedIndexFiles Remove index files failed",
zap.Bool("MarkDeleted", true), zap.Error(err))
}
@ -734,7 +732,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
zap.Int64("Recycle the low version index files of the index with indexBuildID", meta.indexMeta.IndexBuildID))
for j := 1; j < int(meta.indexMeta.Version); j++ {
unusedIndexFilePathPrefix := Params.IndexCoordCfg.IndexStorageRootPath + "/" + strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j)
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
if err := i.chunkManager.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Error("IndexCoord recycleUnusedIndexFiles Remove index files failed",
zap.Bool("MarkDeleted", false), zap.Error(err))
}

View File

@ -22,10 +22,10 @@ import (
"errors"
"sync"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
@ -207,7 +207,7 @@ type TaskScheduler struct {
idAllocator *allocator.GlobalIDAllocator
metaTable *metaTable
kv kv.BaseKV
cm storage.ChunkManager
wg sync.WaitGroup
ctx context.Context
@ -217,13 +217,13 @@ type TaskScheduler struct {
// NewTaskScheduler creates a new task scheduler of indexing tasks.
func NewTaskScheduler(ctx context.Context,
idAllocator *allocator.GlobalIDAllocator,
kv kv.BaseKV,
cm storage.ChunkManager,
table *metaTable) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{
idAllocator: idAllocator,
metaTable: table,
kv: kv,
cm: cm,
ctx: ctx1,
cancel: cancel,
}

View File

@ -43,11 +43,10 @@ import (
"unsafe"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -86,8 +85,8 @@ type IndexNode struct {
once sync.Once
kv kv.BaseKV
session *sessionutil.Session
chunkManager storage.ChunkManager
session *sessionutil.Session
// Add callback functions at different stages
startCallbacks []func()
@ -112,7 +111,7 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
loopCancel: cancel,
}
b.UpdateStateCode(internalpb.StateCode_Abnormal)
sc, err := NewTaskScheduler(b.loopCtx, b.kv)
sc, err := NewTaskScheduler(b.loopCtx, b.chunkManager)
if err != nil {
return nil, err
}
@ -182,22 +181,21 @@ func (i *IndexNode) Init() error {
etcdKV := etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
i.etcdKV = etcdKV
option := &miniokv.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := miniokv.NewMinIOKV(i.loopCtx, option)
chunkManager, err := storage.NewMinioChunkManager(i.loopCtx,
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.CreateBucket(true))
if err != nil {
log.Error("IndexNode NewMinIOKV failed", zap.Error(err))
initErr = err
return
}
i.kv = kv
i.chunkManager = chunkManager
log.Debug("IndexNode NewMinIOKV succeeded")
i.closer = trace.InitTracing("index_node")
@ -294,7 +292,7 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde
done: make(chan error),
},
req: request,
kv: i.kv,
cm: i.chunkManager,
etcdKV: i.etcdKV,
nodeID: Params.IndexNodeCfg.NodeID,
serializedSize: 0,

View File

@ -121,14 +121,14 @@ func TestIndexNode(t *testing.T) {
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
}
err = in.kv.MultiSave(kvs)
err = in.chunkManager.MultiWrite(kvs)
assert.Nil(t, err)
indexMeta := &indexpb.IndexMeta{
@ -186,10 +186,10 @@ func TestIndexNode(t *testing.T) {
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
err = in.kv.Remove(k)
err = in.chunkManager.Remove(k)
assert.Nil(t, err)
}
}()
@ -236,14 +236,14 @@ func TestIndexNode(t *testing.T) {
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(binaryVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
}
err = in.kv.MultiSave(kvs)
err = in.chunkManager.MultiWrite(kvs)
assert.Nil(t, err)
indexMeta := &indexpb.IndexMeta{
@ -297,10 +297,10 @@ func TestIndexNode(t *testing.T) {
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
err = in.kv.Remove(k)
err = in.chunkManager.Remove(k)
assert.Nil(t, err)
}
}()
@ -348,14 +348,14 @@ func TestIndexNode(t *testing.T) {
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
}
err = in.kv.MultiSave(kvs)
err = in.chunkManager.MultiWrite(kvs)
assert.Nil(t, err)
indexMeta := &indexpb.IndexMeta{
@ -416,10 +416,10 @@ func TestIndexNode(t *testing.T) {
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.Nil(t, err)
//}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
err = in.kv.Remove(k)
err = in.chunkManager.Remove(k)
assert.Nil(t, err)
}
}()
@ -534,14 +534,14 @@ func TestCreateIndexFailed(t *testing.T) {
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
}
err = in.kv.MultiSave(kvs)
err = in.chunkManager.MultiWrite(kvs)
assert.Nil(t, err)
indexMeta := &indexpb.IndexMeta{
@ -603,10 +603,10 @@ func TestCreateIndexFailed(t *testing.T) {
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
err = in.kv.Remove(k)
err = in.chunkManager.Remove(k)
assert.Nil(t, err)
}
}()
@ -652,14 +652,14 @@ func TestCreateIndexFailed(t *testing.T) {
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
}
err = in.kv.MultiSave(kvs)
err = in.chunkManager.MultiWrite(kvs)
assert.Nil(t, err)
indexMeta2 := &indexpb.IndexMeta{
@ -722,10 +722,10 @@ func TestCreateIndexFailed(t *testing.T) {
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
err = in.kv.Remove(k)
err = in.chunkManager.Remove(k)
assert.Nil(t, err)
}
}()

View File

@ -30,7 +30,6 @@ import (
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -120,7 +119,7 @@ func (bt *BaseTask) Notify(err error) {
type IndexBuildTask struct {
BaseTask
index Index
kv kv.BaseKV
cm storage.ChunkManager
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.CreateIndexRequest
@ -329,11 +328,11 @@ func (it *IndexBuildTask) prepareParams(ctx context.Context) error {
func (it *IndexBuildTask) loadVector(ctx context.Context) (storage.FieldID, storage.FieldData, error) {
getValueByPath := func(path string) ([]byte, error) {
data, err := it.kv.Load(path)
data, err := it.cm.Read(path)
if err != nil {
return nil, err
}
return []byte(data), nil
return data, nil
}
getBlobByPath := func(path string) (*Blob, error) {
value, err := getValueByPath(path)
@ -516,7 +515,7 @@ func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob)
zap.Any("indexMeta.Version", indexMeta.Version))
return errors.New("This task has been reassigned, check indexMeta.version and request ")
}
return it.kv.Save(savePath, string(blob.Value))
return it.cm.Write(savePath, blob.Value)
}
err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5))
if err != nil {

View File

@ -22,9 +22,9 @@ import (
"errors"
"sync"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
@ -192,7 +192,7 @@ type TaskScheduler struct {
IndexBuildQueue TaskQueue
buildParallel int
kv kv.BaseKV
cm storage.ChunkManager
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@ -200,10 +200,10 @@ type TaskScheduler struct {
// NewTaskScheduler creates a new task scheduler of indexing tasks.
func NewTaskScheduler(ctx context.Context,
kv kv.BaseKV) (*TaskScheduler, error) {
cm storage.ChunkManager) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{
kv: kv,
cm: cm,
ctx: ctx1,
cancel: cancel,
buildParallel: 1, // default value

View File

@ -1,378 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package miniokv
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"sync"
"io"
"strings"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
)
var _ kv.DataKV = (*MinIOKV)(nil)
// MinIOKV implements DataKV interface and relies on underling MinIO service.
// MinIOKV object contains a client which can be used to access the MinIO service.
type MinIOKV struct {
ctx context.Context
minioClient *minio.Client
bucketName string
}
// Option option when creates MinIOKV.
type Option struct {
Address string
AccessKeyID string
BucketName string
SecretAccessKeyID string
UseSSL bool
CreateBucket bool // when bucket not existed, create it
}
// NewMinIOKV creates MinIOKV to save and load object to MinIOKV.
func NewMinIOKV(ctx context.Context, option *Option) (*MinIOKV, error) {
var minIOClient *minio.Client
var err error
minIOClient, err = minio.New(option.Address, &minio.Options{
Creds: credentials.NewStaticV4(option.AccessKeyID, option.SecretAccessKeyID, ""),
Secure: option.UseSSL,
})
// options nil or invalid formatted endpoint, don't need to retry
if err != nil {
return nil, err
}
var bucketExists bool
// check valid in first query
checkBucketFn := func() error {
bucketExists, err = minIOClient.BucketExists(ctx, option.BucketName)
if err != nil {
return err
}
if !bucketExists {
log.Debug("MinioKV NewMinioKV", zap.Any("Check bucket", "bucket not exist"))
if option.CreateBucket {
log.Debug("MinioKV NewMinioKV create bucket.")
return minIOClient.MakeBucket(ctx, option.BucketName, minio.MakeBucketOptions{})
}
return fmt.Errorf("bucket %s not Existed", option.BucketName)
}
return nil
}
err = retry.Do(ctx, checkBucketFn, retry.Attempts(300))
if err != nil {
return nil, err
}
kv := &MinIOKV{
ctx: ctx,
minioClient: minIOClient,
bucketName: option.BucketName,
}
log.Debug("MinioKV new MinioKV success.")
return kv, nil
}
// Exist checks whether a key exists in MinIO.
func (kv *MinIOKV) Exist(key string) bool {
_, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{})
return err == nil
}
// LoadWithPrefix loads objects with the same prefix @key from minio .
func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) {
objects := kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: key})
var objectsKeys []string
var objectsValues []string
for object := range objects {
objectsKeys = append(objectsKeys, object.Key)
}
objectsValues, err := kv.MultiLoad(objectsKeys)
if err != nil {
log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err))
return nil, nil, err
}
return objectsKeys, objectsValues, nil
}
func (kv *MinIOKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
objects := kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: key})
var (
objectsKeys = make([]string, 0, len(objects))
objectsValues [][]byte
)
for object := range objects {
objectsKeys = append(objectsKeys, object.Key)
}
objectsValues, err := kv.MultiLoadBytes(objectsKeys)
if err != nil {
log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err))
return nil, nil, err
}
return objectsKeys, objectsValues, nil
}
// Load loads an object with @key.
func (kv *MinIOKV) Load(key string) (string, error) {
object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{})
if err != nil {
return "", err
}
if object != nil {
defer object.Close()
}
info, err := object.Stat()
if err != nil {
return "", err
}
buf := new(strings.Builder)
buf.Grow(int(info.Size))
_, err = io.Copy(buf, object)
if err != nil && err != io.EOF {
return "", err
}
return buf.String(), nil
}
// Load loads an object with @key.
func (kv *MinIOKV) LoadBytes(key string) ([]byte, error) {
object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
if object != nil {
defer object.Close()
}
info, err := object.Stat()
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(make([]byte, 0, info.Size))
_, err = io.Copy(buf, object)
if err != nil && err != io.EOF {
return nil, err
}
return buf.Bytes(), nil
}
// FGetObject downloads file from minio to local storage system.
func (kv *MinIOKV) FGetObject(key, localPath string) error {
return kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
}
// FGetObjects downloads files from minio to local storage system.
// For parallell downloads file, n goroutines will be started to download n keys.
func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error {
var wg sync.WaitGroup
el := make(errorList, len(keys))
for i, key := range keys {
wg.Add(1)
go func(i int, key string) {
err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
if err != nil {
el[i] = err
}
wg.Done()
}(i, key)
}
wg.Wait()
for _, err := range el {
if err != nil {
return el
}
}
return nil
}
// MultiLoad loads objects with multi @keys.
func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) {
var objectsValues []string
for _, key := range keys {
objectValue, err := kv.Load(key)
if err != nil {
return nil, err
}
objectsValues = append(objectsValues, objectValue)
}
return objectsValues, nil
}
func (kv *MinIOKV) MultiLoadBytes(keys []string) ([][]byte, error) {
objectsValues := make([][]byte, 0, len(keys))
for _, key := range keys {
objectValue, err := kv.LoadBytes(key)
if err != nil {
return nil, err
}
objectsValues = append(objectsValues, objectValue)
}
return objectsValues, nil
}
// Save object with @key to Minio. Object value is @value.
func (kv *MinIOKV) Save(key, value string) error {
reader := strings.NewReader(value)
_, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{})
return err
}
func (kv *MinIOKV) SaveBytes(key string, value []byte) error {
reader := bytes.NewReader(value)
_, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{})
return err
}
// MultiSave saves multiple objects, the path is the key of @kvs.
// The object value is the value of @kvs.
func (kv *MinIOKV) MultiSave(kvs map[string]string) error {
for key, value := range kvs {
err := kv.Save(key, value)
if err != nil {
return err
}
}
return nil
}
func (kv *MinIOKV) MultiSaveBytes(kvs map[string][]byte) error {
for key, value := range kvs {
err := kv.SaveBytes(key, value)
if err != nil {
return err
}
}
return nil
}
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (kv *MinIOKV) RemoveWithPrefix(prefix string) error {
objectsCh := make(chan minio.ObjectInfo)
go func() {
defer close(objectsCh)
for object := range kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
objectsCh <- object
}
}()
for rErr := range kv.minioClient.RemoveObjects(kv.ctx, kv.bucketName, objectsCh, minio.RemoveObjectsOptions{GovernanceBypass: true}) {
if rErr.Err != nil {
return rErr.Err
}
}
return nil
}
// Remove deletes an object with @key.
func (kv *MinIOKV) Remove(key string) error {
return kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, key, minio.RemoveObjectOptions{})
}
// MultiRemove deletes an objects with @keys.
func (kv *MinIOKV) MultiRemove(keys []string) error {
for _, key := range keys {
err := kv.Remove(key)
if err != nil {
return err
}
}
return nil
}
// LoadPartial loads partial data ranged in [start, end) with @key.
func (kv *MinIOKV) LoadPartial(key string, start, end int64) ([]byte, error) {
switch {
case start < 0 || end < 0:
return nil, fmt.Errorf("invalid range specified: start=%d end=%d",
start, end)
case start >= end:
return nil, fmt.Errorf("invalid range specified: start=%d end=%d",
start, end)
}
opts := minio.GetObjectOptions{}
err := opts.SetRange(start, end-1)
if err != nil {
return nil, err
}
object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, opts)
if err != nil {
return nil, err
}
defer object.Close()
return ioutil.ReadAll(object)
}
// GetSize obtains the data size of the object with @key.
func (kv *MinIOKV) GetSize(key string) (int64, error) {
objectInfo, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{})
if err != nil {
return 0, err
}
return objectInfo.Size, nil
}
// Close close the MinIOKV.
func (kv *MinIOKV) Close() {
}
type errorList []error
func (el errorList) Error() string {
var builder strings.Builder
builder.WriteString("All downloads results:\n")
for index, err := range el {
builder.WriteString(fmt.Sprintf("downloads #%d:%s\n", index+1, err.Error()))
}
return builder.String()
}

View File

@ -1,483 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package miniokv_test
import (
"context"
"io/ioutil"
"os"
"path"
"strconv"
"testing"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var Params paramtable.BaseTable
func newMinIOKVClient(ctx context.Context, bucketName string) (*miniokv.MinIOKV, error) {
endPoint, _ := Params.Load("_MinioAddress")
accessKeyID, _ := Params.Load("minio.accessKeyID")
secretAccessKey, _ := Params.Load("minio.secretAccessKey")
useSSLStr, _ := Params.Load("minio.useSSL")
useSSL, _ := strconv.ParseBool(useSSLStr)
option := &miniokv.Option{
Address: endPoint,
AccessKeyID: accessKeyID,
SecretAccessKeyID: secretAccessKey,
UseSSL: useSSL,
BucketName: bucketName,
CreateBucket: true,
}
client, err := miniokv.NewMinIOKV(ctx, option)
return client, err
}
func TestMinIOKV(t *testing.T) {
Params.Init()
testBucket, err := Params.Load("minio.bucketName")
require.NoError(t, err)
configRoot, err := Params.Load("minio.rootPath")
require.NoError(t, err)
testMinIOKVRoot := path.Join(configRoot, "milvus-minio-ut-root")
t.Run("test load", func(t *testing.T) {
testLoadRoot := path.Join(testMinIOKVRoot, "test_load")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
require.NoError(t, err)
defer testKV.RemoveWithPrefix(testLoadRoot)
prepareTests := []struct {
key string
value string
}{
{"abc", "123"},
{"abcd", "1234"},
{"key_1", "111"},
{"key_2", "222"},
{"key_3", "333"},
}
for _, test := range prepareTests {
err = testKV.Save(path.Join(testLoadRoot, test.key), test.value)
require.NoError(t, err)
}
loadTests := []struct {
isvalid bool
loadKey string
expectedValue string
description string
}{
{true, "abc", "123", "load valid key abc"},
{true, "abcd", "1234", "load valid key abcd"},
{true, "key_1", "111", "load valid key key_1"},
{true, "key_2", "222", "load valid key key_2"},
{true, "key_3", "333", "load valid key key_3"},
{false, "key_not_exist", "", "load invalid key key_not_exist"},
{false, "/", "", "load leading slash"},
}
for _, test := range loadTests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
got, err := testKV.Load(path.Join(testLoadRoot, test.loadKey))
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, got)
} else {
if test.loadKey == "/" {
got, err := testKV.Load(test.loadKey)
assert.Error(t, err)
assert.Empty(t, got)
return
}
got, err := testKV.Load(path.Join(testLoadRoot, test.loadKey))
assert.Error(t, err)
assert.Empty(t, got)
value, err := testKV.LoadBytes(path.Join(testLoadRoot, test.loadKey))
assert.Error(t, err)
assert.Nil(t, value)
}
})
}
loadWithPrefixTests := []struct {
isvalid bool
prefix string
expectedValue []string
description string
}{
{true, "abc", []string{"123", "1234"}, "load with valid prefix abc"},
{true, "key_", []string{"111", "222", "333"}, "load with valid prefix key_"},
{true, "prefix", []string{}, "load with valid but not exist prefix prefix"},
}
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testKV.LoadWithPrefix(path.Join(testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
assert.ElementsMatch(t, test.expectedValue, gotv)
keys, values, err := testKV.LoadBytesWithPrefix(path.Join(testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(keys))
assert.Equal(t, len(test.expectedValue), len(values))
expectedValuesBytes := make([][]byte, 0)
for _, value := range test.expectedValue {
expectedValuesBytes = append(expectedValuesBytes, []byte(value))
}
assert.ElementsMatch(t, expectedValuesBytes, values)
})
}
multiLoadTests := []struct {
isvalid bool
multiKeys []string
expectedValue []string
description string
}{
{false, []string{"key_1", "key_not_exist"}, nil, "multiload 1 exist 1 not"},
{true, []string{"abc", "key_3"}, []string{"123", "333"}, "multiload 2 exist"},
}
for _, test := range multiLoadTests {
t.Run(test.description, func(t *testing.T) {
for i := range test.multiKeys {
test.multiKeys[i] = path.Join(testLoadRoot, test.multiKeys[i])
}
if test.isvalid {
got, err := testKV.MultiLoad(test.multiKeys)
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, got)
} else {
got, err := testKV.MultiLoad(test.multiKeys)
assert.Error(t, err)
assert.Equal(t, test.expectedValue, got)
value, err := testKV.MultiLoadBytes(test.multiKeys)
assert.Error(t, err)
assert.Nil(t, value)
}
})
}
})
t.Run("test MultiSave", func(t *testing.T) {
testMultiSaveRoot := path.Join(testMinIOKVRoot, "test_multisave")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
assert.Nil(t, err)
defer testKV.RemoveWithPrefix(testMultiSaveRoot)
err = testKV.Save(path.Join(testMultiSaveRoot, "key_1"), "111")
assert.Nil(t, err)
kvs := map[string]string{
path.Join(testMultiSaveRoot, "key_1"): "123",
path.Join(testMultiSaveRoot, "key_2"): "456",
}
err = testKV.MultiSave(kvs)
assert.Nil(t, err)
for k, v := range kvs {
val, err := testKV.Load(k)
assert.Nil(t, err)
assert.Equal(t, v, val)
}
bytesKvs := map[string][]byte{
path.Join(testMultiSaveRoot, "key_1"): {0x12, 0x34},
path.Join(testMultiSaveRoot, "key_2"): {0x56, 0x78},
}
err = testKV.MultiSaveBytes(bytesKvs)
assert.NoError(t, err)
for k, v := range bytesKvs {
val, err := testKV.LoadBytes(k)
assert.Nil(t, err)
assert.Equal(t, v, val)
}
})
t.Run("test Remove", func(t *testing.T) {
testRemoveRoot := path.Join(testMinIOKVRoot, "test_remove")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
assert.Nil(t, err)
defer testKV.RemoveWithPrefix(testRemoveRoot)
prepareTests := []struct {
k string
v string
}{
{"key_1", "123"},
{"key_2", "456"},
{"mkey_1", "111"},
{"mkey_2", "222"},
{"mkey_3", "333"},
}
for _, test := range prepareTests {
k := path.Join(testRemoveRoot, test.k)
err = testKV.Save(k, test.v)
require.NoError(t, err)
}
removeTests := []struct {
removeKey string
valueBeforeRemove string
description string
}{
{"key_1", "123", "remove key_1"},
{"key_2", "456", "remove key_2"},
}
for _, test := range removeTests {
t.Run(test.description, func(t *testing.T) {
k := path.Join(testRemoveRoot, test.removeKey)
v, err := testKV.Load(k)
require.NoError(t, err)
require.Equal(t, test.valueBeforeRemove, v)
err = testKV.Remove(k)
assert.NoError(t, err)
exist := testKV.Exist(k)
assert.False(t, exist)
v, err = testKV.Load(k)
require.Error(t, err)
require.Empty(t, v)
})
}
multiRemoveTest := []string{
path.Join(testRemoveRoot, "mkey_1"),
path.Join(testRemoveRoot, "mkey_2"),
path.Join(testRemoveRoot, "mkey_3"),
}
lv, err := testKV.MultiLoad(multiRemoveTest)
require.NoError(t, err)
require.ElementsMatch(t, []string{"111", "222", "333"}, lv)
err = testKV.MultiRemove(multiRemoveTest)
assert.NoError(t, err)
for _, k := range multiRemoveTest {
v, err := testKV.Load(k)
assert.Error(t, err)
assert.Empty(t, v)
}
})
t.Run("test LoadPartial", func(t *testing.T) {
testLoadPartialRoot := path.Join(testMinIOKVRoot, "load_partial")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
require.NoError(t, err)
defer testKV.RemoveWithPrefix(testLoadPartialRoot)
key := path.Join(testLoadPartialRoot, "TestMinIOKV_LoadPartial_key")
value := "TestMinIOKV_LoadPartial_value"
err = testKV.Save(key, value)
assert.NoError(t, err)
var start, end int64
var partial []byte
start, end = 1, 2
partial, err = testKV.LoadPartial(key, start, end)
assert.NoError(t, err)
assert.ElementsMatch(t, partial, []byte(value[start:end]))
start, end = 0, int64(len(value))
partial, err = testKV.LoadPartial(key, start, end)
assert.NoError(t, err)
assert.ElementsMatch(t, partial, []byte(value[start:end]))
// error case
start, end = 5, 3
_, err = testKV.LoadPartial(key, start, end)
assert.Error(t, err)
start, end = 1, 1
_, err = testKV.LoadPartial(key, start, end)
assert.Error(t, err)
start, end = -1, 1
_, err = testKV.LoadPartial(key, start, end)
assert.Error(t, err)
start, end = 1, -1
_, err = testKV.LoadPartial(key, start, end)
assert.Error(t, err)
err = testKV.Remove(key)
assert.NoError(t, err)
start, end = 1, 2
_, err = testKV.LoadPartial(key, start, end)
assert.Error(t, err)
})
t.Run("test FGetObject", func(t *testing.T) {
testPath := "/tmp/milvus/data"
testFGetObjectRoot := path.Join(testMinIOKVRoot, "fget_object")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
require.Nil(t, err)
defer testKV.RemoveWithPrefix(testFGetObjectRoot)
name1 := path.Join(testFGetObjectRoot, "31280791048324/4325023534/53443534/key_1")
value1 := "123"
err = testKV.Save(name1, value1)
assert.Nil(t, err)
name2 := path.Join(testFGetObjectRoot, "312895849354/31205934503459/18948129301/key_2")
value2 := "333"
err = testKV.Save(name2, value2)
assert.Nil(t, err)
err = testKV.FGetObject(name1, testPath)
assert.Nil(t, err)
err = testKV.FGetObject(name2, testPath)
assert.Nil(t, err)
err = testKV.FGetObject("fail", testPath)
assert.NotNil(t, err)
file1, err := os.Open(testPath + name1)
assert.Nil(t, err)
content1, err := ioutil.ReadAll(file1)
assert.Nil(t, err)
assert.Equal(t, value1, string(content1))
defer file1.Close()
defer os.Remove(testPath + name1)
file2, err := os.Open(testPath + name2)
assert.Nil(t, err)
content2, err := ioutil.ReadAll(file2)
assert.Nil(t, err)
assert.Equal(t, value2, string(content2))
defer file1.Close()
defer os.Remove(testPath + name2)
})
t.Run("test FGetObjects", func(t *testing.T) {
testPath := "/tmp/milvus/data"
testFGetObjectsRoot := path.Join(testMinIOKVRoot, "fget_objects")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bucketName := "fantastic-tech-test"
testKV, err := newMinIOKVClient(ctx, bucketName)
require.NoError(t, err)
defer testKV.RemoveWithPrefix(testFGetObjectsRoot)
name1 := path.Join(testFGetObjectsRoot, "31280791048324/4325023534/53443534/key_1")
value1 := "123"
err = testKV.Save(name1, value1)
assert.Nil(t, err)
name2 := path.Join(testFGetObjectsRoot, "312895849354/31205934503459/18948129301/key_2")
value2 := "333"
err = testKV.Save(name2, value2)
assert.Nil(t, err)
err = testKV.FGetObjects([]string{name1, name2}, testPath)
assert.Nil(t, err)
err = testKV.FGetObjects([]string{"fail1", "fail2"}, testPath)
assert.NotNil(t, err)
file1, err := os.Open(testPath + name1)
assert.Nil(t, err)
content1, err := ioutil.ReadAll(file1)
assert.Nil(t, err)
assert.Equal(t, value1, string(content1))
defer file1.Close()
defer os.Remove(testPath + name1)
file2, err := os.Open(testPath + name2)
assert.Nil(t, err)
content2, err := ioutil.ReadAll(file2)
assert.Nil(t, err)
assert.Equal(t, value2, string(content2))
defer file1.Close()
defer os.Remove(testPath + name2)
})
t.Run("test GetSize", func(t *testing.T) {
testGetSizeRoot := path.Join(testMinIOKVRoot, "get_size")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testKV, err := newMinIOKVClient(ctx, testBucket)
require.NoError(t, err)
defer testKV.RemoveWithPrefix(testGetSizeRoot)
key := path.Join(testGetSizeRoot, "TestMinIOKV_GetSize_key")
value := "TestMinIOKV_GetSize_value"
err = testKV.Save(key, value)
assert.NoError(t, err)
size, err := testKV.GetSize(key)
assert.NoError(t, err)
assert.Equal(t, size, int64(len(value)))
key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2")
size, err = testKV.GetSize(key2)
assert.Error(t, err)
assert.Equal(t, int64(0), size)
})
}

View File

@ -291,7 +291,7 @@ func genSimpleIndexParams() indexParam {
return indexParams
}
func generateIndex(indexBuildID UniqueID, dataKv kv.DataKV) ([]string, error) {
func generateIndex(indexBuildID UniqueID, cm storage.ChunkManager) ([]string, error) {
indexParams := genSimpleIndexParams()
var indexParamsKV []*commonpb.KeyValuePair
@ -338,7 +338,7 @@ func generateIndex(indexBuildID UniqueID, dataKv kv.DataKV) ([]string, error) {
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(indexBuildID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := dataKv.Save(p, string(index.Value))
err := cm.Write(p, index.Value)
if err != nil {
return nil, err
}
@ -347,13 +347,13 @@ func generateIndex(indexBuildID UniqueID, dataKv kv.DataKV) ([]string, error) {
return indexPaths, nil
}
func generateIndexFileInfo(indexBuildIDs []int64, dataKV kv.DataKV) ([]*indexpb.IndexFilePathInfo, error) {
func generateIndexFileInfo(indexBuildIDs []int64, cm storage.ChunkManager) ([]*indexpb.IndexFilePathInfo, error) {
schema := genDefaultCollectionSchema(false)
sizePerRecord, _ := typeutil.EstimateSizePerRecord(schema)
var indexInfos []*indexpb.IndexFilePathInfo
for _, buildID := range indexBuildIDs {
indexPaths, err := generateIndex(buildID, dataKV)
indexPaths, err := generateIndex(buildID, cm)
if err != nil {
return nil, err
}

View File

@ -8,8 +8,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -31,10 +29,10 @@ type globalMetaBroker struct {
dataCoord types.DataCoord
indexCoord types.IndexCoord
dataKV kv.DataKV
cm storage.ChunkManager
}
func newGlobalMetaBroker(ctx context.Context, rootCoord types.RootCoord, dataCoord types.DataCoord, indexCoord types.IndexCoord) (*globalMetaBroker, error) {
func newGlobalMetaBroker(ctx context.Context, rootCoord types.RootCoord, dataCoord types.DataCoord, indexCoord types.IndexCoord, cm storage.ChunkManager) (*globalMetaBroker, error) {
childCtx, cancel := context.WithCancel(ctx)
parser := &globalMetaBroker{
ctx: childCtx,
@ -42,21 +40,8 @@ func newGlobalMetaBroker(ctx context.Context, rootCoord types.RootCoord, dataCoo
rootCoord: rootCoord,
dataCoord: dataCoord,
indexCoord: indexCoord,
cm: cm,
}
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
CreateBucket: true,
BucketName: Params.MinioCfg.BucketName,
}
dataKV, err := minioKV.NewMinIOKV(childCtx, option)
if err != nil {
return nil, err
}
parser.dataKV = dataKV
return parser, nil
}
@ -239,7 +224,7 @@ func (broker *globalMetaBroker) parseIndexInfo(ctx context.Context, segmentID Un
for _, indexFilePath := range fieldPathInfo.IndexFilePaths {
// get index params when detecting indexParamPrefix
if path.Base(indexFilePath) == storage.IndexParamsKey {
indexPiece, err := broker.dataKV.Load(indexFilePath)
indexPiece, err := broker.cm.Read(indexFilePath)
if err != nil {
log.Error("load index params file failed",
zap.Int64("segmentID", segmentID),
@ -249,7 +234,7 @@ func (broker *globalMetaBroker) parseIndexInfo(ctx context.Context, segmentID Un
zap.Error(err))
return err
}
_, indexParams, indexName, indexID, err := indexCodec.Deserialize([]*storage.Blob{{Key: storage.IndexParamsKey, Value: []byte(indexPiece)}})
_, indexParams, indexName, indexID, err := indexCodec.Deserialize([]*storage.Blob{{Key: storage.IndexParamsKey, Value: indexPiece}})
if err != nil {
log.Error("deserialize index params file failed",
zap.Int64("segmentID", segmentID),

View File

@ -20,9 +20,12 @@ import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
)
var globalMetaTestDir = "/tmp/milvus_test/global_meta"
func TestGlobalMetaBroker_RootCoord(t *testing.T) {
refreshParams()
ctx, cancel := context.WithCancel(context.Background())
@ -30,7 +33,9 @@ func TestGlobalMetaBroker_RootCoord(t *testing.T) {
rootCoord.createCollection(defaultCollectionID)
rootCoord.createPartition(defaultCollectionID, defaultPartitionID)
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, nil)
cm := storage.NewLocalChunkManager(storage.RootPath(globalMetaTestDir))
defer cm.RemoveWithPrefix("")
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, nil, cm)
assert.Nil(t, err)
t.Run("successCase", func(t *testing.T) {
@ -73,7 +78,9 @@ func TestGlobalMetaBroker_DataCoord(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dataCoord := newDataCoordMock(ctx)
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil)
cm := storage.NewLocalChunkManager(storage.RootPath(globalMetaTestDir))
defer cm.RemoveWithPrefix("")
handler, err := newGlobalMetaBroker(ctx, nil, dataCoord, nil, cm)
assert.Nil(t, err)
t.Run("successCase", func(t *testing.T) {
@ -111,10 +118,12 @@ func TestGlobalMetaBroker_IndexCoord(t *testing.T) {
rootCoord.enableIndex = true
rootCoord.createCollection(defaultCollectionID)
rootCoord.createPartition(defaultCollectionID, defaultPartitionID)
indexCoord, err := newIndexCoordMock(ctx)
indexCoord, err := newIndexCoordMock(globalMetaTestDir)
assert.Nil(t, err)
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, indexCoord)
cm := storage.NewLocalChunkManager(storage.RootPath(globalMetaTestDir))
defer cm.RemoveWithPrefix("")
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, indexCoord, cm)
assert.Nil(t, err)
t.Run("successCase", func(t *testing.T) {

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/allocator"
@ -32,6 +33,8 @@ import (
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
var indexCheckerTestDir = "/tmp/milvus_test/index_checker"
func TestReloadFromKV(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
@ -99,10 +102,12 @@ func TestCheckIndexLoop(t *testing.T) {
assert.Nil(t, err)
rootCoord := newRootCoordMock(ctx)
indexCoord, err := newIndexCoordMock(ctx)
indexCoord, err := newIndexCoordMock(indexCheckerTestDir)
assert.Nil(t, err)
rootCoord.enableIndex = true
broker, err := newGlobalMetaBroker(ctx, rootCoord, nil, indexCoord)
cm := storage.NewLocalChunkManager(storage.RootPath(indexCheckerTestDir))
defer cm.RemoveWithPrefix("")
broker, err := newGlobalMetaBroker(ctx, rootCoord, nil, indexCoord, cm)
assert.Nil(t, err)
segmentInfo := &querypb.SegmentInfo{
@ -168,12 +173,14 @@ func TestHandoffNotExistSegment(t *testing.T) {
rootCoord := newRootCoordMock(ctx)
rootCoord.enableIndex = true
indexCoord, err := newIndexCoordMock(ctx)
indexCoord, err := newIndexCoordMock(indexCheckerTestDir)
assert.Nil(t, err)
indexCoord.returnError = true
dataCoord := newDataCoordMock(ctx)
dataCoord.segmentState = commonpb.SegmentState_NotExist
broker, err := newGlobalMetaBroker(ctx, rootCoord, dataCoord, indexCoord)
cm := storage.NewLocalChunkManager(storage.RootPath(indexCheckerTestDir))
defer cm.RemoveWithPrefix("")
broker, err := newGlobalMetaBroker(ctx, rootCoord, dataCoord, indexCoord, cm)
assert.Nil(t, err)
meta.addCollection(defaultCollectionID, querypb.LoadType_LoadCollection, genDefaultCollectionSchema(false))

View File

@ -23,8 +23,6 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -32,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -383,27 +382,15 @@ func (data *dataCoordMock) GetSegmentStates(ctx context.Context, req *datapb.Get
type indexCoordMock struct {
types.IndexCoord
dataKv kv.DataKV
chunkManager storage.ChunkManager
returnError bool
returnGrpcError bool
}
func newIndexCoordMock(ctx context.Context) (*indexCoordMock, error) {
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)
if err != nil {
return nil, err
}
func newIndexCoordMock(path string) (*indexCoordMock, error) {
cm := storage.NewLocalChunkManager(storage.RootPath(path))
return &indexCoordMock{
dataKv: kv,
chunkManager: cm,
}, nil
}
@ -421,7 +408,7 @@ func (c *indexCoordMock) GetIndexFilePaths(ctx context.Context, req *indexpb.Get
}, nil
}
indexPathInfos, err := generateIndexFileInfo(req.IndexBuildIDs, c.dataKv)
indexPathInfos, err := generateIndexFileInfo(req.IndexBuildIDs, c.chunkManager)
if err != nil {
return &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -98,7 +99,8 @@ type QueryCoord struct {
stateCode atomic.Value
msFactory msgstream.Factory
msFactory msgstream.Factory
chunkManager storage.ChunkManager
}
// Register register query service at etcd
@ -179,8 +181,21 @@ func (qc *QueryCoord) Init() error {
return
}
qc.chunkManager, initError = storage.NewMinioChunkManager(qc.loopCtx,
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.CreateBucket(true))
if initError != nil {
log.Error("query coordinator init cluster failed", zap.Error(initError))
return
}
//init globalMetaBroker
qc.broker, initError = newGlobalMetaBroker(qc.loopCtx, qc.rootCoordClient, qc.dataCoordClient, qc.indexCoordClient)
qc.broker, initError = newGlobalMetaBroker(qc.loopCtx, qc.rootCoordClient, qc.dataCoordClient, qc.indexCoordClient, qc.chunkManager)
if initError != nil {
log.Error("query coordinator init globalMetaBroker failed", zap.Error(initError))
return

View File

@ -41,6 +41,8 @@ import (
"go.uber.org/zap"
)
var queryCoordTestDir = "/tmp/milvus_test/query_coord"
func setup() {
Params.Init()
}
@ -85,7 +87,7 @@ func startQueryCoord(ctx context.Context) (*QueryCoord, error) {
rootCoord.createPartition(defaultCollectionID, defaultPartitionID)
dataCoord := newDataCoordMock(ctx)
indexCoord, err := newIndexCoordMock(ctx)
indexCoord, err := newIndexCoordMock(queryCoordTestDir)
if err != nil {
return nil, err
}

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/indexnode"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -70,6 +69,8 @@ const (
defaultDMLChannel = "query-node-unittest-DML-0"
defaultDeltaChannel = "query-node-unittest-delta-channel-0"
defaultSubName = "query-node-unittest-sub-name-0"
defaultLocalStorage = "/tmp/milvus_test/querynode"
)
const (
@ -357,19 +358,7 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
return nil, err
}
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)
if err != nil {
return nil, err
}
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
// save index to minio
binarySet, err := index.Serialize()
@ -399,7 +388,7 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(segmentID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := kv.Save(p, string(index.Value))
err := cm.Write(p, index.Value)
if err != nil {
return nil, err
}
@ -436,19 +425,7 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy
return nil, err
}
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(context.Background(), option)
if err != nil {
return nil, err
}
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
// save index to minio
binarySet, err := index.Serialize()
@ -478,7 +455,7 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy
for _, index := range serializedIndexBlobs {
p := strconv.Itoa(int(segmentID)) + "/" + index.Key
indexPaths = append(indexPaths, p)
err := kv.Save(p, string(index.Value))
err := cm.Write(p, index.Value)
if err != nil {
return nil, err
}
@ -633,19 +610,6 @@ func genSimpleCollectionMeta() *etcdpb.CollectionMeta {
// ---------- unittest util functions ----------
// functions of third-party
func genMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) {
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
kv, err := minioKV.NewMinIOKV(ctx, option)
return kv, err
}
func genEtcdKV() (*etcdkv.EtcdKV, error) {
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
if err != nil {
@ -955,7 +919,7 @@ func saveBinLog(ctx context.Context,
}
log.Debug(".. [query node unittest] Saving bin logs to MinIO ..", zap.Int("number", len(binLogs)))
kvs := make(map[string]string, len(binLogs))
kvs := make(map[string][]byte, len(binLogs))
// write insert binlog
fieldBinlog := make([]*datapb.FieldBinlog, 0)
@ -967,7 +931,7 @@ func saveBinLog(ctx context.Context,
}
key := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
kvs[key] = string(blob.Value[:])
kvs[key] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{{LogPath: key}},
@ -975,11 +939,8 @@ func saveBinLog(ctx context.Context,
}
log.Debug("[query node unittest] save binlog file to MinIO/S3")
kv, err := genMinioKV(ctx)
if err != nil {
return nil, err
}
err = kv.MultiSave(kvs)
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
err = cm.MultiWrite(kvs)
return fieldBinlog, err
}
@ -1219,7 +1180,8 @@ func genSimpleSegmentLoaderWithMqFactory(ctx context.Context, historicalReplica
if err != nil {
return nil, err
}
return newSegmentLoader(ctx, historicalReplica, streamingReplica, kv, factory), nil
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
return newSegmentLoader(historicalReplica, streamingReplica, kv, cm, factory), nil
}
func genSimpleSegmentLoader(ctx context.Context, historicalReplica ReplicaInterface, streamingReplica ReplicaInterface) (*segmentLoader, error) {

View File

@ -45,12 +45,12 @@ import (
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -115,8 +115,8 @@ type QueryNode struct {
eventCh <-chan *sessionutil.SessionEvent
sessionManager *SessionManager
minioKV kv.BaseKV // minio minioKV
etcdKV *etcdkv.EtcdKV
chunkManager storage.ChunkManager
etcdKV *etcdkv.EtcdKV
}
// NewQueryNode will return a QueryNode with abnormal state.
@ -272,6 +272,20 @@ func (node *QueryNode) Init() error {
return
}
node.chunkManager, err = storage.NewMinioChunkManager(node.queryNodeLoopCtx,
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.CreateBucket(true))
if err != nil {
log.Error("QueryNode init session failed", zap.Error(err))
initError = err
return
}
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath)
log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
node.tSafeReplica = newTSafeReplica()
@ -290,10 +304,11 @@ func (node *QueryNode) Init() error {
node.tSafeReplica,
)
node.loader = newSegmentLoader(node.queryNodeLoopCtx,
node.loader = newSegmentLoader(
node.historical.replica,
node.streaming.replica,
node.etcdKV,
node.chunkManager,
node.msFactory)
//node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.msFactory)

View File

@ -28,6 +28,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -208,7 +209,8 @@ func newQueryNodeMock() *QueryNode {
svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica)
svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory)
svr.statsService = newStatsService(ctx, svr.historical.replica, msFactory)
svr.loader = newSegmentLoader(ctx, svr.historical.replica, svr.streaming.replica, etcdKV, msgstream.NewPmsFactory())
svr.chunkManager = storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
svr.loader = newSegmentLoader(svr.historical.replica, svr.streaming.replica, etcdKV, svr.chunkManager, msgstream.NewPmsFactory())
svr.etcdKV = etcdKV
return svr

View File

@ -29,9 +29,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -55,8 +53,8 @@ type segmentLoader struct {
dataCoord types.DataCoord
minioKV kv.DataKV // minio minioKV
etcdKV *etcdkv.EtcdKV
cm storage.ChunkManager // minio cm
etcdKV *etcdkv.EtcdKV
factory msgstream.Factory
}
@ -263,13 +261,13 @@ func (loader *segmentLoader) loadFiledBinlogData(segment *Segment, fieldBinlogs
blobs := make([]*storage.Blob, 0)
for _, fieldBinlog := range fieldBinlogs {
for _, path := range fieldBinlog.Binlogs {
binLog, err := loader.minioKV.Load(path.GetLogPath())
binLog, err := loader.cm.Read(path.GetLogPath())
if err != nil {
return err
}
blob := &storage.Blob{
Key: path.GetLogPath(),
Value: []byte(binLog),
Value: binLog,
}
blobs = append(blobs, blob)
}
@ -324,13 +322,13 @@ func (loader *segmentLoader) loadVecFieldIndexData(segment *Segment, indexInfo *
indexCodec := storage.NewIndexFileBinlogCodec()
for _, p := range indexInfo.IndexFilePaths {
log.Debug("load index file", zap.String("path", p))
indexPiece, err := loader.minioKV.Load(p)
indexPiece, err := loader.cm.Read(p)
if err != nil {
return err
}
if path.Base(p) != storage.IndexParamsKey {
data, _, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: path.Base(p), Value: []byte(indexPiece)}})
data, _, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: path.Base(p), Value: indexPiece}})
if err != nil {
return err
}
@ -449,13 +447,13 @@ func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment, binlogPath
return nil
}
values, err := loader.minioKV.MultiLoad(binlogPaths)
values, err := loader.cm.MultiRead(binlogPaths)
if err != nil {
return err
}
blobs := make([]*storage.Blob, 0)
for i := 0; i < len(values); i++ {
blobs = append(blobs, &storage.Blob{Value: []byte(values[i])})
blobs = append(blobs, &storage.Blob{Value: values[i]})
}
stats, err := storage.DeserializeStats(blobs)
@ -480,13 +478,13 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
var blobs []*storage.Blob
for _, deltaLog := range deltaLogs {
for _, log := range deltaLog.GetBinlogs() {
value, err := loader.minioKV.Load(log.GetLogPath())
value, err := loader.cm.Read(log.GetLogPath())
if err != nil {
return err
}
blob := &storage.Blob{
Key: log.GetLogPath(),
Value: []byte(value),
Value: value,
}
blobs = append(blobs, blob)
}
@ -678,31 +676,19 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
return nil
}
func newSegmentLoader(ctx context.Context,
func newSegmentLoader(
historicalReplica ReplicaInterface,
streamingReplica ReplicaInterface,
etcdKV *etcdkv.EtcdKV,
cm storage.ChunkManager,
factory msgstream.Factory) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioCfg.Address,
AccessKeyID: Params.MinioCfg.AccessKeyID,
SecretAccessKeyID: Params.MinioCfg.SecretAccessKey,
UseSSL: Params.MinioCfg.UseSSL,
BucketName: Params.MinioCfg.BucketName,
CreateBucket: true,
}
client, err := minioKV.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
}
return &segmentLoader{
historicalReplica: historicalReplica,
streamingReplica: streamingReplica,
minioKV: client,
etcdKV: etcdKV,
cm: cm,
etcdKV: etcdKV,
factory: factory,
}