mirror of https://github.com/milvus-io/milvus.git
Remove dup mock_kv.go and mock_txn_kv.go (#20267)
Replace them with mockery generated mocks in kv/mocks See also: #19289 Signed-off-by: yangxuan <xuan.yang@zilliz.com> Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/20292/head
parent
cb2591d1fe
commit
22fb1d3b93
|
@ -45,8 +45,7 @@ func Test_garbageCollector_basic(t *testing.T) {
|
||||||
cli, _, _, _, _, err := initUtOSSEnv(bucketName, rootPath, 0)
|
cli, _, _, _, _, err := initUtOSSEnv(bucketName, rootPath, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mockAllocator := newMockAllocator()
|
meta, err := newMemoryMeta()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||||
|
@ -108,8 +107,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
||||||
cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4)
|
cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mockAllocator := newMockAllocator()
|
meta, err := newMemoryMeta()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||||
|
|
|
@ -243,7 +243,7 @@ func TestMeta_Basic(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
testSchema := newTestSchema()
|
testSchema := newTestSchema()
|
||||||
|
@ -489,8 +489,7 @@ func TestMeta_Basic(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetUnFlushedSegments(t *testing.T) {
|
func TestGetUnFlushedSegments(t *testing.T) {
|
||||||
mockAllocator := newMockAllocator()
|
meta, err := newMemoryMeta()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
s1 := &datapb.SegmentInfo{
|
s1 := &datapb.SegmentInfo{
|
||||||
ID: 0,
|
ID: 0,
|
||||||
|
|
|
@ -38,7 +38,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newMemoryMeta(allocator allocator) (*meta, error) {
|
func newMemoryMeta() (*meta, error) {
|
||||||
memoryKV := memkv.NewMemoryKV()
|
memoryKV := memkv.NewMemoryKV()
|
||||||
return newMeta(context.TODO(), memoryKV, "")
|
return newMeta(context.TODO(), memoryKV, "")
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ func TestManagerOptions(t *testing.T) {
|
||||||
// ctx := context.Background()
|
// ctx := context.Background()
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
segmentManager := newSegmentManager(meta, mockAllocator, nil)
|
segmentManager := newSegmentManager(meta, mockAllocator, nil)
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func TestAllocSegment(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
segmentManager := newSegmentManager(meta, mockAllocator, nil)
|
segmentManager := newSegmentManager(meta, mockAllocator, nil)
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func TestAllocSegmentForImport(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
ms := newMockRootCoordService()
|
ms := newMockRootCoordService()
|
||||||
segmentManager := newSegmentManager(meta, mockAllocator, ms)
|
segmentManager := newSegmentManager(meta, mockAllocator, ms)
|
||||||
|
@ -185,7 +185,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -235,7 +235,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
|
||||||
func TestSaveSegmentsToMeta(t *testing.T) {
|
func TestSaveSegmentsToMeta(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -257,7 +257,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
|
||||||
func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
|
func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -279,7 +279,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
|
||||||
func TestDropSegment(t *testing.T) {
|
func TestDropSegment(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -302,7 +302,7 @@ func TestDropSegment(t *testing.T) {
|
||||||
func TestAllocRowsLargerThanOneSegment(t *testing.T) {
|
func TestAllocRowsLargerThanOneSegment(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -324,7 +324,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) {
|
||||||
func TestExpireAllocation(t *testing.T) {
|
func TestExpireAllocation(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -367,7 +367,7 @@ func TestGetFlushableSegments(t *testing.T) {
|
||||||
t.Run("get flushable segments between small interval", func(t *testing.T) {
|
t.Run("get flushable segments between small interval", func(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -413,7 +413,7 @@ func TestTryToSealSegment(t *testing.T) {
|
||||||
t.Run("normal seal with segment policies", func(t *testing.T) {
|
t.Run("normal seal with segment policies", func(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -438,7 +438,7 @@ func TestTryToSealSegment(t *testing.T) {
|
||||||
t.Run("normal seal with channel seal policies", func(t *testing.T) {
|
t.Run("normal seal with channel seal policies", func(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
@ -463,7 +463,7 @@ func TestTryToSealSegment(t *testing.T) {
|
||||||
t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) {
|
t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
mockAllocator := newMockAllocator()
|
mockAllocator := newMockAllocator()
|
||||||
meta, err := newMemoryMeta(mockAllocator)
|
meta, err := newMemoryMeta()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
schema := newTestSchema()
|
schema := newTestSchema()
|
||||||
|
|
|
@ -44,7 +44,6 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
"github.com/milvus-io/milvus/internal/kv"
|
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||||
|
@ -2809,18 +2808,6 @@ func TestGetFlushState(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockTxnKVext struct {
|
|
||||||
kv.MockTxnKV
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockTxnKVext) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
|
||||||
return []string{}, []string{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockTxnKVext) MultiSave(kvs map[string]string) error {
|
|
||||||
return errors.New("(testing only) injected error")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDataCoordServer_SetSegmentState(t *testing.T) {
|
func TestDataCoordServer_SetSegmentState(t *testing.T) {
|
||||||
t.Run("normal case", func(t *testing.T) {
|
t.Run("normal case", func(t *testing.T) {
|
||||||
svr := newTestServer(t, nil)
|
svr := newTestServer(t, nil)
|
||||||
|
@ -2862,27 +2849,11 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) {
|
||||||
assert.EqualValues(t, commonpb.SegmentState_Flushed, resp.States[0].State)
|
assert.EqualValues(t, commonpb.SegmentState_Flushed, resp.States[0].State)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("dataCoord meta set state error", func(t *testing.T) {
|
t.Run("dataCoord meta set state not exists", func(t *testing.T) {
|
||||||
meta, err := newMeta(context.TODO(), &mockTxnKVext{}, "")
|
meta, err := newMemoryMeta()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
svr := newTestServerWithMeta(t, nil, meta)
|
svr := newTestServerWithMeta(t, nil, meta)
|
||||||
defer closeTestServer(t, svr)
|
defer closeTestServer(t, svr)
|
||||||
segment := &datapb.SegmentInfo{
|
|
||||||
ID: 1000,
|
|
||||||
CollectionID: 100,
|
|
||||||
PartitionID: 0,
|
|
||||||
InsertChannel: "c1",
|
|
||||||
NumOfRows: 0,
|
|
||||||
State: commonpb.SegmentState_Growing,
|
|
||||||
StartPosition: &internalpb.MsgPosition{
|
|
||||||
ChannelName: "c1",
|
|
||||||
MsgID: []byte{},
|
|
||||||
MsgGroup: "",
|
|
||||||
Timestamp: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
err2 := svr.meta.AddSegment(NewSegmentInfo(segment))
|
|
||||||
assert.NotNil(t, err2)
|
|
||||||
// Set segment state.
|
// Set segment state.
|
||||||
svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{
|
svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{
|
||||||
SegmentId: 1000,
|
SegmentId: 1000,
|
||||||
|
|
|
@ -50,7 +50,7 @@ type BaseKV interface {
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
//go:generate mockery --name=TxnKV
|
//go:generate mockery --name=TxnKV --with-expecter
|
||||||
// TxnKV contains extra txn operations of kv. The extra operations is transactional.
|
// TxnKV contains extra txn operations of kv. The extra operations is transactional.
|
||||||
type TxnKV interface {
|
type TxnKV interface {
|
||||||
BaseKV
|
BaseKV
|
||||||
|
@ -78,7 +78,7 @@ type MetaKv interface {
|
||||||
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
|
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
//go:generate mockery --name=SnapShotKV
|
//go:generate mockery --name=SnapShotKV --with-expecter
|
||||||
// SnapShotKV is TxnKV for snapshot data. It must save timestamp.
|
// SnapShotKV is TxnKV for snapshot data. It must save timestamp.
|
||||||
type SnapShotKV interface {
|
type SnapShotKV interface {
|
||||||
Save(key string, value string, ts typeutil.Timestamp) error
|
Save(key string, value string, ts typeutil.Timestamp) error
|
||||||
|
|
|
@ -1,173 +0,0 @@
|
||||||
// Licensed to the LF AI & Data foundation under one
|
|
||||||
// or more contributor license agreements. See the NOTICE file
|
|
||||||
// distributed with this work for additional information
|
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
|
||||||
// to you under the Apache License, Version 2.0 (the
|
|
||||||
// "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package kv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MockBaseKV struct {
|
|
||||||
InMemKv sync.Map
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) Load(key string) (string, error) {
|
|
||||||
log.Debug("doing load", zap.String("key", key))
|
|
||||||
if val, ok := m.InMemKv.Load(key); ok {
|
|
||||||
return val.(string), nil
|
|
||||||
}
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) MultiLoad(keys []string) ([]string, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) Save(key string, value string) error {
|
|
||||||
m.InMemKv.Store(key, value)
|
|
||||||
log.Debug("doing Save", zap.String("key", key))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) MultiSave(kvs map[string]string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) Remove(key string) error {
|
|
||||||
m.InMemKv.Delete(key)
|
|
||||||
log.Debug("doing Remove", zap.String("key", key))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) MultiRemove(keys []string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) RemoveWithPrefix(key string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockBaseKV) Close() {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
type MockTxnKV struct {
|
|
||||||
MockBaseKV
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockTxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockTxnKV) MultiRemoveWithPrefix(keys []string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockTxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
type MockMetaKV struct {
|
|
||||||
MockTxnKV
|
|
||||||
|
|
||||||
LoadWithPrefixMockErr bool
|
|
||||||
SaveMockErr bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) GetPath(key string) string {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
|
||||||
if m.LoadWithPrefixMockErr {
|
|
||||||
return nil, nil, errors.New("mock err")
|
|
||||||
}
|
|
||||||
keys := make([]string, 0)
|
|
||||||
values := make([]string, 0)
|
|
||||||
m.InMemKv.Range(func(key, value interface{}) bool {
|
|
||||||
if strings.HasPrefix(key.(string), prefix) {
|
|
||||||
keys = append(keys, key.(string))
|
|
||||||
values = append(values, value.(string))
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
return keys, values, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) Watch(key string) clientv3.WatchChan {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) WatchWithPrefix(key string) clientv3.WatchChan {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
|
|
||||||
m.InMemKv.Store(key, value)
|
|
||||||
log.Debug("Doing SaveWithLease", zap.String("key", key))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) SaveWithIgnoreLease(key, value string) error {
|
|
||||||
if m.SaveMockErr {
|
|
||||||
return errors.New("mock error")
|
|
||||||
}
|
|
||||||
m.InMemKv.Store(key, value)
|
|
||||||
log.Debug("Doing SaveWithIgnoreLease", zap.String("key", key))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
|
|
||||||
panic("not implemented") // TODO: Implement
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
// Licensed to the LF AI & Data foundation under one
|
|
||||||
// or more contributor license agreements. See the NOTICE file
|
|
||||||
// distributed with this work for additional information
|
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
|
||||||
// to you under the Apache License, Version 2.0 (the
|
|
||||||
// "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package kv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
|
||||||
)
|
|
||||||
|
|
||||||
const testKey = "key"
|
|
||||||
const testValue = "value"
|
|
||||||
|
|
||||||
func TestMockKV_MetaKV(t *testing.T) {
|
|
||||||
mockKv := &MockMetaKV{}
|
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
value, err := mockKv.Load(testKey)
|
|
||||||
assert.Equal(t, "", value)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiLoad([]string{testKey})
|
|
||||||
})
|
|
||||||
|
|
||||||
_, _, err = mockKv.LoadWithPrefix(testKey)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
assert.NoError(t, mockKv.Save(testKey, testValue))
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiSave(map[string]string{testKey: testValue})
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.NoError(t, mockKv.Remove(testKey))
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiRemove([]string{testKey})
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.RemoveWithPrefix(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.Close()
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiSaveAndRemove(map[string]string{testKey: testValue}, []string{testKey})
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiRemoveWithPrefix([]string{testKey})
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.MultiSaveAndRemoveWithPrefix(map[string]string{testKey: testValue}, []string{testKey})
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.GetPath(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.LoadWithPrefix2(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.LoadWithPrefix2(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.LoadWithRevisionAndVersions(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.LoadWithRevision(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.Watch(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.WatchWithPrefix(testKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.WatchWithRevision(testKey, 100)
|
|
||||||
})
|
|
||||||
|
|
||||||
err = mockKv.SaveWithLease(testKey, testValue, 100)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
err = mockKv.SaveWithIgnoreLease(testKey, testValue)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
leaseID, err := mockKv.Grant(100)
|
|
||||||
assert.Equal(t, clientv3.LeaseID(1), leaseID)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.KeepAlive(100)
|
|
||||||
})
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.CompareValueAndSwap(testKey, testValue, testValue)
|
|
||||||
})
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
mockKv.CompareVersionAndSwap(testKey, 100, testKey)
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
package kv
|
|
||||||
|
|
||||||
type TxnKVMock struct {
|
|
||||||
TxnKV
|
|
||||||
SaveF func(key, value string) error
|
|
||||||
RemoveF func(key string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) Load(key string) (string, error) {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiLoad(keys []string) ([]string, error) {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) LoadWithPrefix(key string) ([]string, []string, error) {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) Save(key, value string) error {
|
|
||||||
return m.SaveF(key, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiSave(kvs map[string]string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) Remove(key string) error {
|
|
||||||
return m.RemoveF(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiRemove(keys []string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) RemoveWithPrefix(key string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) Close() {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiSaveAndRemove(saves map[string]string, removals []string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiRemoveWithPrefix(keys []string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m TxnKVMock) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMockTxnKV() *TxnKVMock {
|
|
||||||
return &TxnKVMock{}
|
|
||||||
}
|
|
|
@ -9,6 +9,14 @@ type SnapShotKV struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SnapShotKV_Expecter struct {
|
||||||
|
mock *mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_m *SnapShotKV) EXPECT() *SnapShotKV_Expecter {
|
||||||
|
return &SnapShotKV_Expecter{mock: &_m.Mock}
|
||||||
|
}
|
||||||
|
|
||||||
// Load provides a mock function with given fields: key, ts
|
// Load provides a mock function with given fields: key, ts
|
||||||
func (_m *SnapShotKV) Load(key string, ts uint64) (string, error) {
|
func (_m *SnapShotKV) Load(key string, ts uint64) (string, error) {
|
||||||
ret := _m.Called(key, ts)
|
ret := _m.Called(key, ts)
|
||||||
|
@ -30,6 +38,30 @@ func (_m *SnapShotKV) Load(key string, ts uint64) (string, error) {
|
||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapShotKV_Load_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Load'
|
||||||
|
type SnapShotKV_Load_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call {
|
||||||
|
return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_Load_Call) Run(run func(key string, ts uint64)) *SnapShotKV_Load_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string), args[1].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_Load_Call) Return(_a0 string, _a1 error) *SnapShotKV_Load_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// LoadWithPrefix provides a mock function with given fields: key, ts
|
// LoadWithPrefix provides a mock function with given fields: key, ts
|
||||||
func (_m *SnapShotKV) LoadWithPrefix(key string, ts uint64) ([]string, []string, error) {
|
func (_m *SnapShotKV) LoadWithPrefix(key string, ts uint64) ([]string, []string, error) {
|
||||||
ret := _m.Called(key, ts)
|
ret := _m.Called(key, ts)
|
||||||
|
@ -62,6 +94,30 @@ func (_m *SnapShotKV) LoadWithPrefix(key string, ts uint64) ([]string, []string,
|
||||||
return r0, r1, r2
|
return r0, r1, r2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapShotKV_LoadWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithPrefix'
|
||||||
|
type SnapShotKV_LoadWithPrefix_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadWithPrefix is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call {
|
||||||
|
return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_LoadWithPrefix_Call) Run(run func(key string, ts uint64)) *SnapShotKV_LoadWithPrefix_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string), args[1].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_LoadWithPrefix_Call) Return(_a0 []string, _a1 []string, _a2 error) *SnapShotKV_LoadWithPrefix_Call {
|
||||||
|
_c.Call.Return(_a0, _a1, _a2)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// MultiSave provides a mock function with given fields: kvs, ts
|
// MultiSave provides a mock function with given fields: kvs, ts
|
||||||
func (_m *SnapShotKV) MultiSave(kvs map[string]string, ts uint64) error {
|
func (_m *SnapShotKV) MultiSave(kvs map[string]string, ts uint64) error {
|
||||||
ret := _m.Called(kvs, ts)
|
ret := _m.Called(kvs, ts)
|
||||||
|
@ -76,6 +132,30 @@ func (_m *SnapShotKV) MultiSave(kvs map[string]string, ts uint64) error {
|
||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapShotKV_MultiSave_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSave'
|
||||||
|
type SnapShotKV_MultiSave_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// MultiSave is a helper method to define mock.On call
|
||||||
|
// - kvs map[string]string
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call {
|
||||||
|
return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_MultiSave_Call) Run(run func(kvs map[string]string, ts uint64)) *SnapShotKV_MultiSave_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(map[string]string), args[1].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_MultiSave_Call) Return(_a0 error) *SnapShotKV_MultiSave_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, ts
|
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, ts
|
||||||
func (_m *SnapShotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts uint64) error {
|
func (_m *SnapShotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts uint64) error {
|
||||||
ret := _m.Called(saves, removals, ts)
|
ret := _m.Called(saves, removals, ts)
|
||||||
|
@ -90,6 +170,31 @@ func (_m *SnapShotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, remo
|
||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapShotKV_MultiSaveAndRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemoveWithPrefix'
|
||||||
|
type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
|
||||||
|
// - saves map[string]string
|
||||||
|
// - removals []string
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
|
||||||
|
return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, ts uint64)) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(map[string]string), args[1].([]string), args[2].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Save provides a mock function with given fields: key, value, ts
|
// Save provides a mock function with given fields: key, value, ts
|
||||||
func (_m *SnapShotKV) Save(key string, value string, ts uint64) error {
|
func (_m *SnapShotKV) Save(key string, value string, ts uint64) error {
|
||||||
ret := _m.Called(key, value, ts)
|
ret := _m.Called(key, value, ts)
|
||||||
|
@ -104,6 +209,31 @@ func (_m *SnapShotKV) Save(key string, value string, ts uint64) error {
|
||||||
return r0
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapShotKV_Save_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Save'
|
||||||
|
type SnapShotKV_Save_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
// - value string
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call {
|
||||||
|
return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_Save_Call) Run(run func(key string, value string, ts uint64)) *SnapShotKV_Save_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string), args[1].(string), args[2].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *SnapShotKV_Save_Call) Return(_a0 error) *SnapShotKV_Save_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
type mockConstructorTestingTNewSnapShotKV interface {
|
type mockConstructorTestingTNewSnapShotKV interface {
|
||||||
mock.TestingT
|
mock.TestingT
|
||||||
Cleanup(func())
|
Cleanup(func())
|
||||||
|
|
|
@ -66,7 +66,7 @@ var flipTaskStateInterval = 15 * 1000
|
||||||
// importManager manager for import tasks
|
// importManager manager for import tasks
|
||||||
type importManager struct {
|
type importManager struct {
|
||||||
ctx context.Context // reserved
|
ctx context.Context // reserved
|
||||||
taskStore kv.MetaKv // Persistent task info storage.
|
taskStore kv.TxnKV // Persistent task info storage.
|
||||||
busyNodes map[int64]int64 // Set of all current working DataNode IDs and related task create timestamp.
|
busyNodes map[int64]int64 // Set of all current working DataNode IDs and related task create timestamp.
|
||||||
|
|
||||||
// TODO: Make pendingTask a map to improve look up performance.
|
// TODO: Make pendingTask a map to improve look up performance.
|
||||||
|
@ -89,7 +89,7 @@ type importManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newImportManager helper function to create a importManager
|
// newImportManager helper function to create a importManager
|
||||||
func newImportManager(ctx context.Context, client kv.MetaKv,
|
func newImportManager(ctx context.Context, client kv.TxnKV,
|
||||||
idAlloc func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error),
|
idAlloc func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error),
|
||||||
importService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error),
|
importService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error),
|
||||||
markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error),
|
markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error),
|
||||||
|
|
|
@ -26,19 +26,17 @@ import (
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/kv"
|
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||||
|
"github.com/milvus-io/milvus/internal/kv/mocks"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||||
"github.com/milvus-io/milvus/internal/util/importutil"
|
"github.com/milvus-io/milvus/internal/util/importutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
type customKV struct {
|
|
||||||
kv.MockMetaKV
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestImportManager_NewImportManager(t *testing.T) {
|
func TestImportManager_NewImportManager(t *testing.T) {
|
||||||
var countLock sync.RWMutex
|
var countLock sync.RWMutex
|
||||||
var globalCount = typeutil.UniqueID(0)
|
var globalCount = typeutil.UniqueID(0)
|
||||||
|
@ -54,8 +52,7 @@ func TestImportManager_NewImportManager(t *testing.T) {
|
||||||
Params.RootCoordCfg.ImportTaskRetention = 200
|
Params.RootCoordCfg.ImportTaskRetention = 200
|
||||||
checkPendingTasksInterval = 100
|
checkPendingTasksInterval = 100
|
||||||
cleanUpLoopInterval = 100
|
cleanUpLoopInterval = 100
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
@ -136,13 +133,13 @@ func TestImportManager_NewImportManager(t *testing.T) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
t.Run("importManager init fail because of loadFromTaskStore fail", func(t *testing.T) {
|
t.Run("importManager init fail because of loadFromTaskStore fail", func(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
mockTxnKV := &mocks.TxnKV{}
|
||||||
|
mockTxnKV.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error"))
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
||||||
mockKv.LoadWithPrefixMockErr = true
|
|
||||||
defer func() {
|
|
||||||
mockKv.LoadWithPrefixMockErr = false
|
|
||||||
}()
|
|
||||||
assert.NotNil(t, mgr)
|
assert.NotNil(t, mgr)
|
||||||
assert.Panics(t, func() {
|
assert.Panics(t, func() {
|
||||||
mgr.init(context.TODO())
|
mgr.init(context.TODO())
|
||||||
|
@ -152,13 +149,14 @@ func TestImportManager_NewImportManager(t *testing.T) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
t.Run("sendOutTasks fail", func(t *testing.T) {
|
t.Run("sendOutTasks fail", func(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
mockTxnKV := &mocks.TxnKV{}
|
||||||
|
mockTxnKV.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil)
|
||||||
|
mockTxnKV.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("mock save error"))
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
||||||
mockKv.SaveMockErr = true
|
|
||||||
defer func() {
|
|
||||||
mockKv.SaveMockErr = false
|
|
||||||
}()
|
|
||||||
assert.NotNil(t, mgr)
|
assert.NotNil(t, mgr)
|
||||||
mgr.init(context.TODO())
|
mgr.init(context.TODO())
|
||||||
})
|
})
|
||||||
|
@ -166,24 +164,23 @@ func TestImportManager_NewImportManager(t *testing.T) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
t.Run("sendOutTasks fail", func(t *testing.T) {
|
t.Run("sendOutTasks fail", func(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
mockTxnKV := &mocks.TxnKV{}
|
||||||
|
mockTxnKV.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
||||||
assert.NotNil(t, mgr)
|
assert.NotNil(t, mgr)
|
||||||
mgr.init(context.TODO())
|
mgr.init(context.TODO())
|
||||||
func() {
|
func() {
|
||||||
mockKv.SaveMockErr = true
|
mockTxnKV.EXPECT().Save(mock.Anything, mock.Anything).Maybe().Return(errors.New("mock save error"))
|
||||||
defer func() {
|
|
||||||
mockKv.SaveMockErr = false
|
|
||||||
}()
|
|
||||||
mgr.sendOutTasks(context.TODO())
|
mgr.sendOutTasks(context.TODO())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
|
mockTxnKV.EXPECT().Save(mock.Anything, mock.Anything).Maybe().Return(nil)
|
||||||
mockCallImportServiceErr = true
|
mockCallImportServiceErr = true
|
||||||
defer func() {
|
|
||||||
mockKv.SaveMockErr = false
|
|
||||||
}()
|
|
||||||
mgr.sendOutTasks(context.TODO())
|
mgr.sendOutTasks(context.TODO())
|
||||||
}()
|
}()
|
||||||
})
|
})
|
||||||
|
@ -254,8 +251,7 @@ func TestImportManager_TestSetImportTaskState(t *testing.T) {
|
||||||
Params.RootCoordCfg.ImportTaskRetention = 200
|
Params.RootCoordCfg.ImportTaskRetention = 200
|
||||||
checkPendingTasksInterval = 100
|
checkPendingTasksInterval = 100
|
||||||
cleanUpLoopInterval = 100
|
cleanUpLoopInterval = 100
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
@ -324,8 +320,7 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) {
|
||||||
Params.RootCoordCfg.ImportTaskRetention = 200
|
Params.RootCoordCfg.ImportTaskRetention = 200
|
||||||
checkPendingTasksInterval = 100
|
checkPendingTasksInterval = 100
|
||||||
cleanUpLoopInterval = 100
|
cleanUpLoopInterval = 100
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
@ -413,8 +408,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
|
||||||
Params.RootCoordCfg.ImportTaskRetention = 200
|
Params.RootCoordCfg.ImportTaskRetention = 200
|
||||||
checkPendingTasksInterval = 100
|
checkPendingTasksInterval = 100
|
||||||
cleanUpLoopInterval = 100
|
cleanUpLoopInterval = 100
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
@ -566,8 +560,7 @@ func TestImportManager_ImportJob(t *testing.T) {
|
||||||
}
|
}
|
||||||
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
colID := int64(100)
|
colID := int64(100)
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) {
|
callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) {
|
||||||
return &commonpb.Status{
|
return &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
|
@ -693,8 +686,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) {
|
||||||
}
|
}
|
||||||
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
colID := int64(100)
|
colID := int64(100)
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
rowReq := &milvuspb.ImportRequest{
|
rowReq := &milvuspb.ImportRequest{
|
||||||
CollectionName: "c1",
|
CollectionName: "c1",
|
||||||
PartitionName: "p1",
|
PartitionName: "p1",
|
||||||
|
@ -790,8 +782,7 @@ func TestImportManager_TaskState(t *testing.T) {
|
||||||
}
|
}
|
||||||
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
colID := int64(100)
|
colID := int64(100)
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
importServiceFunc := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
importServiceFunc := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
||||||
return &datapb.ImportTaskResponse{
|
return &datapb.ImportTaskResponse{
|
||||||
Status: &commonpb.Status{
|
Status: &commonpb.Status{
|
||||||
|
@ -891,8 +882,7 @@ func TestImportManager_AllocFail(t *testing.T) {
|
||||||
}
|
}
|
||||||
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
colID := int64(100)
|
colID := int64(100)
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
importServiceFunc := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
importServiceFunc := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
||||||
return &datapb.ImportTaskResponse{
|
return &datapb.ImportTaskResponse{
|
||||||
Status: &commonpb.Status{
|
Status: &commonpb.Status{
|
||||||
|
@ -931,8 +921,7 @@ func TestImportManager_ListAllTasks(t *testing.T) {
|
||||||
|
|
||||||
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
colID := int64(100)
|
colID := int64(100)
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
|
|
||||||
// reject some tasks so there are 3 tasks left in pending list
|
// reject some tasks so there are 3 tasks left in pending list
|
||||||
fn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
fn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/allocator"
|
"github.com/milvus-io/milvus/internal/allocator"
|
||||||
"github.com/milvus-io/milvus/internal/kv"
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||||
|
@ -250,17 +249,6 @@ func newMockTsoAllocator() *tso.MockAllocator {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTxnKV() *kv.TxnKVMock {
|
|
||||||
r := kv.NewMockTxnKV()
|
|
||||||
r.SaveF = func(key, value string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
r.RemoveF = func(key string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockProxy struct {
|
type mockProxy struct {
|
||||||
types.Proxy
|
types.Proxy
|
||||||
InvalidateCollectionMetaCacheFunc func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
|
InvalidateCollectionMetaCacheFunc func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/allocator"
|
"github.com/milvus-io/milvus/internal/allocator"
|
||||||
"github.com/milvus-io/milvus/internal/kv"
|
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||||
|
@ -864,8 +864,7 @@ func TestCore_Import(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCore_GetImportState(t *testing.T) {
|
func TestCore_GetImportState(t *testing.T) {
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
@ -914,8 +913,7 @@ func TestCore_GetImportState(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCore_ListImportTasks(t *testing.T) {
|
func TestCore_ListImportTasks(t *testing.T) {
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
CollectionName: "collection-A",
|
CollectionName: "collection-A",
|
||||||
|
@ -980,8 +978,7 @@ func TestCore_ReportImport(t *testing.T) {
|
||||||
globalCount++
|
globalCount++
|
||||||
return globalCount, 0, nil
|
return globalCount, 0, nil
|
||||||
}
|
}
|
||||||
mockKv := &kv.MockMetaKV{}
|
mockKv := memkv.NewMemoryKV()
|
||||||
mockKv.InMemKv = sync.Map{}
|
|
||||||
ti1 := &datapb.ImportTaskInfo{
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
Id: 100,
|
Id: 100,
|
||||||
State: &datapb.ImportTaskState{
|
State: &datapb.ImportTaskState{
|
||||||
|
|
Loading…
Reference in New Issue