Add predicates for TxnKV operations (#27365)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27394/head
congqixia 2023-09-27 10:21:26 +08:00 committed by GitHub
parent 2d6a968233
commit a3dd2756cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1585 additions and 713 deletions

View File

@ -440,6 +440,7 @@ generate-mockery-kv: getdeps
$(INSTALL_PATH)/mockery --name=MetaKv --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=meta_kv.go --with-expecter
$(INSTALL_PATH)/mockery --name=WatchKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=watch_kv.go --with-expecter
$(INSTALL_PATH)/mockery --name=SnapShotKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=snapshot_kv.go --with-expecter
$(INSTALL_PATH)/mockery --name=Predicate --dir=$(PWD)/internal/kv/predicates --output=$(PWD)/internal/kv/predicates --filename=mock_predicate.go --with-expecter --inpackage
generate-mockery-pkg:
$(MAKE) -C pkg generate-mockery

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils"
@ -73,7 +74,7 @@ func genChannelOperations(from, to int64, num int) ChannelOpSet {
func TestChannelStore_Update(t *testing.T) {
txnKv := mocks.NewTxnKV(t)
txnKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Run(func(saves map[string]string, removals []string) {
txnKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
assert.False(t, len(saves)+len(removals) > 128, "too many operations")
}).Return(nil)

View File

@ -30,8 +30,10 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// implementation assertion
@ -421,7 +423,12 @@ func (kv *EmbedEtcdKV) MultiRemove(keys []string) error {
}
// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
}
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
@ -434,8 +441,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []st
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return merr.WrapErrIoFailedReason("failed to execute transaction")
}
return nil
}
// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction.
@ -475,7 +489,12 @@ func (kv *EmbedEtcdKV) WatchWithRevision(key string, revision int64) clientv3.Wa
}
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
}
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
@ -488,8 +507,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, rem
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return merr.WrapErrIoFailedReason("failed to execute transaction")
}
return nil
}
// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.

View File

@ -18,15 +18,20 @@ package etcdkv_test
import (
"fmt"
"path"
"sort"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/milvus-io/milvus/internal/kv"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -825,3 +830,90 @@ func TestEmbedEtcd(te *testing.T) {
assert.False(t, has)
})
}
type EmbedEtcdKVSuite struct {
suite.Suite
param *paramtable.ComponentParam
rootPath string
kv kv.MetaKv
}
func (s *EmbedEtcdKVSuite) SetupSuite() {
te := s.T()
te.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode)
param := new(paramtable.ComponentParam)
te.Setenv("etcd.use.embed", "true")
te.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml")
dir := te.TempDir()
te.Setenv("etcd.data.dir", dir)
param.Init(paramtable.NewBaseTable())
s.param = param
}
func (s *EmbedEtcdKVSuite) SetupTest() {
s.rootPath = path.Join("unittest/etcdkv", funcutil.RandomString(8))
metaKv, err := embed_etcd_kv.NewMetaKvFactory(s.rootPath, &s.param.EtcdCfg)
s.Require().NoError(err)
s.kv = metaKv
}
func (s *EmbedEtcdKVSuite) TearDownTest() {
if s.kv != nil {
s.kv.RemoveWithPrefix("")
s.kv.Close()
s.kv = nil
}
}
func (s *EmbedEtcdKVSuite) TestTxnWithPredicates() {
etcdKV := s.kv
prepareKV := map[string]string{
"lease1": "1",
"lease2": "2",
}
err := etcdKV.MultiSave(prepareKV)
s.Require().NoError(err)
badPredicate := predicates.NewMockPredicate(s.T())
badPredicate.EXPECT().Type().Return(0)
badPredicate.EXPECT().Target().Return(predicates.PredTargetValue)
multiSaveAndRemovePredTests := []struct {
tag string
multiSave map[string]string
preds []predicates.Predicate
expectSuccess bool
}{
{"predicate_ok", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "1")}, true},
{"predicate_fail", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "2")}, false},
{"bad_predicate", map[string]string{"a": "b"}, []predicates.Predicate{badPredicate}, false},
}
for _, test := range multiSaveAndRemovePredTests {
s.Run(test.tag, func() {
err := etcdKV.MultiSaveAndRemove(test.multiSave, nil, test.preds...)
if test.expectSuccess {
s.NoError(err)
} else {
s.Error(err)
}
err = etcdKV.MultiSaveAndRemoveWithPrefix(test.multiSave, nil, test.preds...)
if test.expectSuccess {
s.NoError(err)
} else {
s.Error(err)
}
})
}
}
func TestEmbedEtcdKV(t *testing.T) {
suite.Run(t, new(EmbedEtcdKVSuite))
}

View File

@ -26,9 +26,11 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -443,7 +445,12 @@ func (kv *etcdKV) MultiRemove(keys []string) error {
}
// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
}
start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
var keys []string
@ -459,7 +466,7 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...)
if err != nil {
log.Warn("Etcd MultiSaveAndRemove error",
zap.Any("saves", saves),
@ -467,9 +474,14 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
zap.Int("saveLength", len(saves)),
zap.Int("removeLength", len(removals)),
zap.Error(err))
return err
}
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove", zap.Strings("keys", keys))
return err
if !resp.Succeeded {
log.Warn("failed to executeTxn", zap.Any("resp", resp))
return merr.WrapErrIoFailedReason("failed to execute transaction")
}
return nil
}
// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction.
@ -530,7 +542,12 @@ func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh
}
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
}
start := time.Now()
ops := make([]clientv3.Op, 0, len(saves))
var keys []string
@ -546,7 +563,7 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...)
if err != nil {
log.Warn("Etcd MultiSaveAndRemoveWithPrefix error",
zap.Any("saves", saves),
@ -554,9 +571,13 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
zap.Int("saveLength", len(saves)),
zap.Int("removeLength", len(removals)),
zap.Error(err))
return err
}
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix", zap.Strings("keys", keys))
return err
if !resp.Succeeded {
return merr.WrapErrIoFailedReason("failed to execute transaction")
}
return nil
}
// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.

File diff suppressed because it is too large Load Diff

42
internal/kv/etcd/util.go Normal file
View File

@ -0,0 +1,42 @@
package etcdkv
import (
"fmt"
"path"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func parsePredicates(rootPath string, preds ...predicates.Predicate) ([]clientv3.Cmp, error) {
if len(preds) == 0 {
return []clientv3.Cmp{}, nil
}
result := make([]clientv3.Cmp, 0, len(preds))
for _, pred := range preds {
switch pred.Target() {
case predicates.PredTargetValue:
pt, err := parsePredicateType(pred.Type())
if err != nil {
return nil, err
}
cmp := clientv3.Compare(clientv3.Value(path.Join(rootPath, pred.Key())), pt, pred.TargetValue())
result = append(result, cmp)
default:
return nil, merr.WrapErrParameterInvalid("valid predicate target", fmt.Sprintf("%d", pred.Target()))
}
}
return result, nil
}
// parsePredicateType parse predicates.PredicateType to clientv3.Result
func parsePredicateType(pt predicates.PredicateType) (string, error) {
switch pt {
case predicates.PredTypeEqual:
return "=", nil
default:
return "", merr.WrapErrParameterInvalid("valid predicate type", fmt.Sprintf("%d", pt))
}
}

View File

@ -0,0 +1,72 @@
package etcdkv
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv/predicates"
)
type EtcdKVUtilSuite struct {
suite.Suite
}
func (s *EtcdKVUtilSuite) TestParsePredicateType() {
type testCase struct {
tag string
pt predicates.PredicateType
expectResult string
expectSucceed bool
}
cases := []testCase{
{tag: "equal", pt: predicates.PredTypeEqual, expectResult: "=", expectSucceed: true},
{tag: "zero_value", pt: 0, expectResult: "", expectSucceed: false},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
result, err := parsePredicateType(tc.pt)
if tc.expectSucceed {
s.NoError(err)
s.Equal(tc.expectResult, result)
} else {
s.Error(err)
}
})
}
}
func (s *EtcdKVUtilSuite) TestParsePredicates() {
type testCase struct {
tag string
input []predicates.Predicate
expectSucceed bool
}
badPredicate := predicates.NewMockPredicate(s.T())
badPredicate.EXPECT().Target().Return(0)
cases := []testCase{
{tag: "normal_value_equal", input: []predicates.Predicate{predicates.ValueEqual("a", "b")}, expectSucceed: true},
{tag: "empty_input", input: nil, expectSucceed: true},
{tag: "bad_predicates", input: []predicates.Predicate{badPredicate}, expectSucceed: false},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
result, err := parsePredicates("", tc.input...)
if tc.expectSucceed {
s.NoError(err)
s.Equal(len(tc.input), len(result))
} else {
s.Error(err)
}
})
}
}
func TestEtcdKVUtil(t *testing.T) {
suite.Run(t, new(EtcdKVUtilSuite))
}

View File

@ -19,6 +19,7 @@ package kv
import (
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -57,8 +58,8 @@ type BaseKV interface {
//go:generate mockery --name=TxnKV --with-expecter
type TxnKV interface {
BaseKV
MultiSaveAndRemove(saves map[string]string, removals []string) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error
}
// MetaKv is TxnKV for metadata. It should save data with lease.

View File

@ -22,7 +22,9 @@ import (
"github.com/google/btree"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// MemoryKV implements BaseKv interface and relies on underling btree.BTree.
@ -217,7 +219,10 @@ func (kv *MemoryKV) MultiRemove(keys []string) error {
}
// MultiSaveAndRemove saves and removes given key-value pairs in MemoryKV atomicly.
func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
if len(preds) > 0 {
return merr.WrapErrServiceUnavailable("predicates not supported")
}
kv.Lock()
defer kv.Unlock()
for key, value := range saves {
@ -283,7 +288,10 @@ func (kv *MemoryKV) Close() {
}
// MultiSaveAndRemoveWithPrefix saves key-value pairs in @saves, & remove key with prefix in @removals in MemoryKV atomically.
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
if len(preds) > 0 {
return merr.WrapErrServiceUnavailable("predicates not supported")
}
kv.Lock()
defer kv.Unlock()

View File

@ -20,6 +20,9 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestMemoryKV_SaveAndLoadBytes(t *testing.T) {
@ -242,3 +245,16 @@ func TestHasPrefix(t *testing.T) {
assert.NoError(t, err)
assert.False(t, has)
}
func TestPredicates(t *testing.T) {
kv := NewMemoryKV()
// predicates not supported for mem kv for now
err := kv.MultiSaveAndRemove(map[string]string{}, []string{}, predicates.ValueEqual("a", "b"))
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrServiceUnavailable)
err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{}, predicates.ValueEqual("a", "b"))
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrServiceUnavailable)
}

View File

@ -2,7 +2,10 @@
package mocks
import mock "github.com/stretchr/testify/mock"
import (
predicates "github.com/milvus-io/milvus/internal/kv/predicates"
mock "github.com/stretchr/testify/mock"
)
// MetaKv is an autogenerated mock type for the MetaKv type
type MetaKv struct {
@ -502,13 +505,20 @@ func (_c *MetaKv_MultiSave_Call) RunAndReturn(run func(map[string]string) error)
return _c
}
// MultiSaveAndRemove provides a mock function with given fields: saves, removals
func (_m *MetaKv) MultiSaveAndRemove(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds
func (_m *MetaKv) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -524,13 +534,21 @@ type MetaKv_MultiSaveAndRemove_Call struct {
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemove_Call {
return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
// - preds ...predicates.Predicate
func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *MetaKv_MultiSaveAndRemove_Call {
return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *MetaKv_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemove_Call {
func (_c *MetaKv_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *MetaKv_MultiSaveAndRemove_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -540,18 +558,25 @@ func (_c *MetaKv_MultiSaveAndRemove_Call) Return(_a0 error) *MetaKv_MultiSaveAnd
return _c
}
func (_c *MetaKv_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *MetaKv_MultiSaveAndRemove_Call {
func (_c *MetaKv_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *MetaKv_MultiSaveAndRemove_Call {
_c.Call.Return(run)
return _c
}
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals
func (_m *MetaKv) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds
func (_m *MetaKv) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -567,13 +592,21 @@ type MetaKv_MultiSaveAndRemoveWithPrefix_Call struct {
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
// - preds ...predicates.Predicate
func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -583,7 +616,7 @@ func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *MetaKv_Mu
return _c
}
func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Return(run)
return _c
}

View File

@ -2,7 +2,10 @@
package mocks
import mock "github.com/stretchr/testify/mock"
import (
predicates "github.com/milvus-io/milvus/internal/kv/predicates"
mock "github.com/stretchr/testify/mock"
)
// TxnKV is an autogenerated mock type for the TxnKV type
type TxnKV struct {
@ -406,13 +409,20 @@ func (_c *TxnKV_MultiSave_Call) RunAndReturn(run func(map[string]string) error)
return _c
}
// MultiSaveAndRemove provides a mock function with given fields: saves, removals
func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds
func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -428,13 +438,21 @@ type TxnKV_MultiSaveAndRemove_Call struct {
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
// - preds ...predicates.Predicate
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *TxnKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemove_Call {
func (_c *TxnKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *TxnKV_MultiSaveAndRemove_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -444,18 +462,25 @@ func (_c *TxnKV_MultiSaveAndRemove_Call) Return(_a0 error) *TxnKV_MultiSaveAndRe
return _c
}
func (_c *TxnKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *TxnKV_MultiSaveAndRemove_Call {
func (_c *TxnKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *TxnKV_MultiSaveAndRemove_Call {
_c.Call.Return(run)
return _c
}
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals
func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds
func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -471,13 +496,21 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct {
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
// - preds ...predicates.Predicate
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -487,7 +520,7 @@ func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *TxnKV_Mult
return _c
}
func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Return(run)
return _c
}

View File

@ -6,6 +6,8 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
mock "github.com/stretchr/testify/mock"
predicates "github.com/milvus-io/milvus/internal/kv/predicates"
)
// WatchKV is an autogenerated mock type for the WatchKV type
@ -506,13 +508,20 @@ func (_c *WatchKV_MultiSave_Call) RunAndReturn(run func(map[string]string) error
return _c
}
// MultiSaveAndRemove provides a mock function with given fields: saves, removals
func (_m *WatchKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds
func (_m *WatchKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -528,13 +537,21 @@ type WatchKV_MultiSaveAndRemove_Call struct {
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *WatchKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *WatchKV_MultiSaveAndRemove_Call {
return &WatchKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
// - preds ...predicates.Predicate
func (_e *WatchKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *WatchKV_MultiSaveAndRemove_Call {
return &WatchKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *WatchKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *WatchKV_MultiSaveAndRemove_Call {
func (_c *WatchKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *WatchKV_MultiSaveAndRemove_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -544,18 +561,25 @@ func (_c *WatchKV_MultiSaveAndRemove_Call) Return(_a0 error) *WatchKV_MultiSaveA
return _c
}
func (_c *WatchKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *WatchKV_MultiSaveAndRemove_Call {
func (_c *WatchKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *WatchKV_MultiSaveAndRemove_Call {
_c.Call.Return(run)
return _c
}
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals
func (_m *WatchKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
ret := _m.Called(saves, removals)
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds
func (_m *WatchKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
_va := make([]interface{}, len(preds))
for _i := range preds {
_va[_i] = preds[_i]
}
var _ca []interface{}
_ca = append(_ca, saves, removals)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok {
r0 = rf(saves, removals)
if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok {
r0 = rf(saves, removals, preds...)
} else {
r0 = ret.Error(0)
}
@ -571,13 +595,21 @@ type WatchKV_MultiSaveAndRemoveWithPrefix_Call struct {
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
func (_e *WatchKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
return &WatchKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
// - preds ...predicates.Predicate
func (_e *WatchKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
return &WatchKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix",
append([]interface{}{saves, removals}, preds...)...)}
}
func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]string), args[1].([]string))
variadicArgs := make([]predicates.Predicate, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(predicates.Predicate)
}
}
run(args[0].(map[string]string), args[1].([]string), variadicArgs...)
})
return _c
}
@ -587,7 +619,7 @@ func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *WatchKV_
return _c
}
func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *WatchKV_MultiSaveAndRemoveWithPrefix_Call {
_c.Call.Return(run)
return _c
}

View File

@ -0,0 +1,240 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package predicates
import mock "github.com/stretchr/testify/mock"
// MockPredicate is an autogenerated mock type for the Predicate type
type MockPredicate struct {
mock.Mock
}
type MockPredicate_Expecter struct {
mock *mock.Mock
}
func (_m *MockPredicate) EXPECT() *MockPredicate_Expecter {
return &MockPredicate_Expecter{mock: &_m.Mock}
}
// IsTrue provides a mock function with given fields: _a0
func (_m *MockPredicate) IsTrue(_a0 interface{}) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(interface{}) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockPredicate_IsTrue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTrue'
type MockPredicate_IsTrue_Call struct {
*mock.Call
}
// IsTrue is a helper method to define mock.On call
// - _a0 interface{}
func (_e *MockPredicate_Expecter) IsTrue(_a0 interface{}) *MockPredicate_IsTrue_Call {
return &MockPredicate_IsTrue_Call{Call: _e.mock.On("IsTrue", _a0)}
}
func (_c *MockPredicate_IsTrue_Call) Run(run func(_a0 interface{})) *MockPredicate_IsTrue_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(interface{}))
})
return _c
}
func (_c *MockPredicate_IsTrue_Call) Return(_a0 bool) *MockPredicate_IsTrue_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPredicate_IsTrue_Call) RunAndReturn(run func(interface{}) bool) *MockPredicate_IsTrue_Call {
_c.Call.Return(run)
return _c
}
// Key provides a mock function with given fields:
func (_m *MockPredicate) Key() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockPredicate_Key_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Key'
type MockPredicate_Key_Call struct {
*mock.Call
}
// Key is a helper method to define mock.On call
func (_e *MockPredicate_Expecter) Key() *MockPredicate_Key_Call {
return &MockPredicate_Key_Call{Call: _e.mock.On("Key")}
}
func (_c *MockPredicate_Key_Call) Run(run func()) *MockPredicate_Key_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockPredicate_Key_Call) Return(_a0 string) *MockPredicate_Key_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPredicate_Key_Call) RunAndReturn(run func() string) *MockPredicate_Key_Call {
_c.Call.Return(run)
return _c
}
// Target provides a mock function with given fields:
func (_m *MockPredicate) Target() PredicateTarget {
ret := _m.Called()
var r0 PredicateTarget
if rf, ok := ret.Get(0).(func() PredicateTarget); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(PredicateTarget)
}
return r0
}
// MockPredicate_Target_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Target'
type MockPredicate_Target_Call struct {
*mock.Call
}
// Target is a helper method to define mock.On call
func (_e *MockPredicate_Expecter) Target() *MockPredicate_Target_Call {
return &MockPredicate_Target_Call{Call: _e.mock.On("Target")}
}
func (_c *MockPredicate_Target_Call) Run(run func()) *MockPredicate_Target_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockPredicate_Target_Call) Return(_a0 PredicateTarget) *MockPredicate_Target_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPredicate_Target_Call) RunAndReturn(run func() PredicateTarget) *MockPredicate_Target_Call {
_c.Call.Return(run)
return _c
}
// TargetValue provides a mock function with given fields:
func (_m *MockPredicate) TargetValue() interface{} {
ret := _m.Called()
var r0 interface{}
if rf, ok := ret.Get(0).(func() interface{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(interface{})
}
}
return r0
}
// MockPredicate_TargetValue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TargetValue'
type MockPredicate_TargetValue_Call struct {
*mock.Call
}
// TargetValue is a helper method to define mock.On call
func (_e *MockPredicate_Expecter) TargetValue() *MockPredicate_TargetValue_Call {
return &MockPredicate_TargetValue_Call{Call: _e.mock.On("TargetValue")}
}
func (_c *MockPredicate_TargetValue_Call) Run(run func()) *MockPredicate_TargetValue_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockPredicate_TargetValue_Call) Return(_a0 interface{}) *MockPredicate_TargetValue_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPredicate_TargetValue_Call) RunAndReturn(run func() interface{}) *MockPredicate_TargetValue_Call {
_c.Call.Return(run)
return _c
}
// Type provides a mock function with given fields:
func (_m *MockPredicate) Type() PredicateType {
ret := _m.Called()
var r0 PredicateType
if rf, ok := ret.Get(0).(func() PredicateType); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(PredicateType)
}
return r0
}
// MockPredicate_Type_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Type'
type MockPredicate_Type_Call struct {
*mock.Call
}
// Type is a helper method to define mock.On call
func (_e *MockPredicate_Expecter) Type() *MockPredicate_Type_Call {
return &MockPredicate_Type_Call{Call: _e.mock.On("Type")}
}
func (_c *MockPredicate_Type_Call) Run(run func()) *MockPredicate_Type_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockPredicate_Type_Call) Return(_a0 PredicateType) *MockPredicate_Type_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockPredicate_Type_Call) RunAndReturn(run func() PredicateType) *MockPredicate_Type_Call {
_c.Call.Return(run)
return _c
}
// NewMockPredicate creates a new instance of MockPredicate. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockPredicate(t interface {
mock.TestingT
Cleanup(func())
}) *MockPredicate {
mock := &MockPredicate{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,73 @@
package predicates
// PredicateTarget is enum for Predicate target type.
type PredicateTarget int32
const (
// PredTargetValue is predicate target for key-value perid
PredTargetValue PredicateTarget = iota + 1
)
type PredicateType int32
const (
PredTypeEqual PredicateType = iota + 1
)
// Predicate provides interface for kv predicate.
type Predicate interface {
Target() PredicateTarget
Type() PredicateType
IsTrue(any) bool
Key() string
TargetValue() any
}
type valuePredicate struct {
k, v string
pt PredicateType
}
func (p *valuePredicate) Target() PredicateTarget {
return PredTargetValue
}
func (p *valuePredicate) Type() PredicateType {
return p.pt
}
func (p *valuePredicate) IsTrue(target any) bool {
switch v := target.(type) {
case string:
return predicateValue(p.pt, v, p.v)
case []byte:
return predicateValue(p.pt, string(v), p.v)
default:
return false
}
}
func (p *valuePredicate) Key() string {
return p.k
}
func (p *valuePredicate) TargetValue() any {
return p.v
}
func predicateValue[T comparable](pt PredicateType, v1, v2 T) bool {
switch pt {
case PredTypeEqual:
return v1 == v2
default:
return false
}
}
func ValueEqual(k, v string) Predicate {
return &valuePredicate{
k: k,
v: v,
pt: PredTypeEqual,
}
}

View File

@ -0,0 +1,33 @@
package predicates
import (
"testing"
"github.com/stretchr/testify/suite"
)
type PredicateSuite struct {
suite.Suite
}
func (s *PredicateSuite) TestValueEqual() {
p := ValueEqual("key", "value")
s.Equal("key", p.Key())
s.Equal("value", p.TargetValue())
s.Equal(PredTargetValue, p.Target())
s.Equal(PredTypeEqual, p.Type())
s.True(p.IsTrue("value"))
s.False(p.IsTrue("not_value"))
s.True(p.IsTrue([]byte("value")))
s.False(p.IsTrue(1))
}
func (s *PredicateSuite) TestPredicateValue() {
s.True(predicateValue(PredTypeEqual, 1, 1))
s.False(predicateValue(PredTypeEqual, 1, 2))
s.False(predicateValue(0, 1, 1))
}
func TestPredicates(t *testing.T) {
suite.Run(t, new(PredicateSuite))
}

View File

@ -23,6 +23,8 @@ import (
"github.com/tecbot/gorocksdb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -389,7 +391,10 @@ func (kv *RocksdbKV) MultiRemove(keys []string) error {
}
// MultiSaveAndRemove provides a transaction to execute a batch of operations
func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
if len(preds) > 0 {
return merr.WrapErrServiceUnavailable("predicates not supported")
}
if kv.DB == nil {
return errors.New("Rocksdb instance is nil when do MultiSaveAndRemove")
}
@ -421,7 +426,10 @@ func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error {
}
// MultiSaveAndRemoveWithPrefix is used to execute a batch operators with the same prefix
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
if len(preds) > 0 {
return merr.WrapErrServiceUnavailable("predicates not supported")
}
if kv.DB == nil {
return errors.New("Rocksdb instance is nil when do MultiSaveAndRemove")
}

View File

@ -23,8 +23,11 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/kv/predicates"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestRocksdbKV(t *testing.T) {
@ -364,3 +367,20 @@ func TestHasPrefix(t *testing.T) {
assert.NoError(t, err)
assert.False(t, has)
}
func TestPredicates(t *testing.T) {
dir := t.TempDir()
db, err := rocksdbkv.NewRocksdbKV(dir)
require.NoError(t, err)
defer db.Close()
defer db.RemoveWithPrefix("")
err = db.MultiSaveAndRemove(map[string]string{}, []string{}, predicates.ValueEqual("a", "b"))
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrServiceUnavailable)
err = db.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{}, predicates.ValueEqual("a", "b"))
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrServiceUnavailable)
}

View File

@ -33,9 +33,11 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -426,72 +428,98 @@ func (kv *txnTiKV) RemoveWithPrefix(prefix string) error {
}
// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var logging_error error
defer logWarnOnFailure(&logging_error, "txnTiKV MultiSaveAndRemove error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)))
var loggingErr error
defer logWarnOnFailure(&loggingErr, "txnTiKV MultiSaveAndRemove error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)))
txn, err := beginTxn(kv.txn)
if err != nil {
logging_error = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemove")
return logging_error
loggingErr = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemove")
return loggingErr
}
// Defer a rollback only if the transaction hasn't been committed
defer rollbackOnFailure(&logging_error, txn)
defer rollbackOnFailure(&loggingErr, txn)
for _, pred := range preds {
key := path.Join(kv.rootPath, pred.Key())
val, err := txn.Get(ctx, []byte(key))
if err != nil {
loggingErr = errors.Wrap(err, fmt.Sprintf("failed to read predicate target (%s:%v) for MultiSaveAndRemove", pred.Key(), pred.TargetValue()))
return loggingErr
}
if !pred.IsTrue(val) {
loggingErr = merr.WrapErrIoFailedReason("failed to meet predicate", fmt.Sprintf("key=%s, value=%v", pred.Key(), pred.TargetValue()))
return loggingErr
}
}
for key, value := range saves {
key = path.Join(kv.rootPath, key)
// Check if value is empty or taking reserved EmptyValue
byte_value, err := convertEmptyStringToByte(value)
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemove", key, value))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemove", key, value))
return loggingErr
}
err = txn.Set([]byte(key), byte_value)
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemove", key, value))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemove", key, value))
return loggingErr
}
}
for _, key := range removals {
key = path.Join(kv.rootPath, key)
if err = txn.Delete([]byte(key)); err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key))
return loggingErr
}
}
err = kv.executeTxn(txn, ctx)
if err != nil {
logging_error = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove")
return logging_error
loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove")
return loggingErr
}
CheckElapseAndWarn(start, "Slow txnTiKV MultiSaveAndRemove() operation", zap.Any("saves", saves), zap.Strings("removals", removals))
return nil
}
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var logging_error error
defer logWarnOnFailure(&logging_error, "txnTiKV MultiSaveAndRemoveWithPrefix() error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)))
var loggingErr error
defer logWarnOnFailure(&loggingErr, "txnTiKV MultiSaveAndRemoveWithPrefix() error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)))
txn, err := beginTxn(kv.txn)
if err != nil {
logging_error = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemoveWithPrefix")
return logging_error
loggingErr = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemoveWithPrefix")
return loggingErr
}
// Defer a rollback only if the transaction hasn't been committed
defer rollbackOnFailure(&logging_error, txn)
defer rollbackOnFailure(&loggingErr, txn)
for _, pred := range preds {
key := path.Join(kv.rootPath, pred.Key())
val, err := txn.Get(ctx, []byte(key))
if err != nil {
loggingErr = errors.Wrap(err, fmt.Sprintf("failed to read predicate target (%s:%v) for MultiSaveAndRemove", pred.Key(), pred.TargetValue()))
return loggingErr
}
if !pred.IsTrue(val) {
loggingErr = merr.WrapErrIoFailedReason("failed to meet predicate", fmt.Sprintf("key=%s, value=%v", pred.Key(), pred.TargetValue()))
return loggingErr
}
}
// Save key-value pairs
for key, value := range saves {
@ -499,13 +527,13 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal
// Check if value is empty or taking reserved EmptyValue
byte_value, err := convertEmptyStringToByte(value)
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
return loggingErr
}
err = txn.Set([]byte(key), byte_value)
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
return loggingErr
}
}
// Remove keys with prefix
@ -518,31 +546,31 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal
// Use Scan to iterate over keys in the prefix range
iter, err := txn.Iter(startKey, endKey)
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during MultiSaveAndRemoveWithPrefix()", prefix))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during MultiSaveAndRemoveWithPrefix()", prefix))
return loggingErr
}
// Iterate over keys and delete them
for iter.Valid() {
key := iter.Key()
err = txn.Delete(key)
if logging_error != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemoveWithPrefix", string(key)))
return logging_error
if loggingErr != nil {
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemoveWithPrefix", string(key)))
return loggingErr
}
// Move the iterator to the next key
err = iter.Next()
if err != nil {
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiSaveAndRemoveWithPrefix", string(key)))
return logging_error
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiSaveAndRemoveWithPrefix", string(key)))
return loggingErr
}
}
}
err = kv.executeTxn(txn, ctx)
if err != nil {
logging_error = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix")
return logging_error
loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix")
return loggingErr
}
CheckElapseAndWarn(start, "Slow txnTiKV MultiSaveAndRemoveWithPrefix() operation", zap.Any("saves", saves), zap.Strings("removals", removals))
return nil

View File

@ -29,6 +29,8 @@ import (
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"golang.org/x/exp/maps"
"github.com/milvus-io/milvus/internal/kv/predicates"
)
func TestTiKVLoad(te *testing.T) {
@ -596,3 +598,45 @@ func TestTiKVUnimplemented(t *testing.T) {
_, err = kv.CompareVersionAndSwap("k", 1, "target")
assert.Error(t, err)
}
func TestTxnWithPredicates(t *testing.T) {
kv := NewTiKV(txnClient, "/")
err := kv.RemoveWithPrefix("")
require.NoError(t, err)
prepareKV := map[string]string{
"lease1": "1",
"lease2": "2",
}
err = kv.MultiSave(prepareKV)
require.NoError(t, err)
multiSaveAndRemovePredTests := []struct {
tag string
multiSave map[string]string
preds []predicates.Predicate
expectSuccess bool
}{
{"predicate_ok", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "1")}, true},
{"predicate_fail", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "2")}, false},
}
for _, test := range multiSaveAndRemovePredTests {
t.Run(test.tag, func(t *testing.T) {
err := kv.MultiSaveAndRemove(test.multiSave, nil, test.preds...)
t.Log(err)
if test.expectSuccess {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
err = kv.MultiSaveAndRemoveWithPrefix(test.multiSave, nil, test.preds...)
if test.expectSuccess {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
})
}
}

View File

@ -554,6 +554,14 @@ func WrapErrIoFailed(key string, msg ...string) error {
return err
}
func WrapErrIoFailedReason(reason string, msg ...string) error {
err := errors.Wrapf(ErrIoFailed, reason)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Parameter related
func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error {
err := errors.Wrapf(ErrParameterInvalid, "expected=%v, actual=%v", expected, actual)