change the etcd compareAndSwap interface (#18068)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/18097/head
Xiaofan 2022-07-06 13:54:21 +08:00 committed by GitHub
parent 9f1361036c
commit a9b1d71a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 186 additions and 197 deletions

View File

@ -79,8 +79,6 @@ const (
ConnectEtcdMaxRetryTime = 100
)
const illegalRequestErrStr = "Illegal request"
// makes sure DataNode implements types.DataNode
var _ types.DataNode = (*DataNode)(nil)
@ -382,11 +380,9 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
case datapb.ChannelWatchState_ToRelease:
if node.tryToReleaseFlowgraph(vChanName) {
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
} else {
watchInfo.State = datapb.ChannelWatchState_ReleaseFailure
}
// there is no reason why we release fail
node.tryToReleaseFlowgraph(vChanName)
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
}
v, err := proto.Marshal(watchInfo)
@ -394,32 +390,37 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
}
k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", Params.DataNodeCfg.GetNodeID()), vChanName)
key := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", Params.DataNodeCfg.GetNodeID()), vChanName)
log.Debug("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String()))
err = node.watchKv.CompareVersionAndSwap(k, version, string(v))
success, err := node.watchKv.CompareVersionAndSwap(key, version, string(v))
// etcd error, retrying
if err != nil {
return fmt.Errorf("fail to update watch state to etcd, vChanName: %s, state: %s, err: %w", vChanName, watchInfo.State.String(), err)
// flow graph will leak if not release, causing new datanode failed to subscribe
node.tryToReleaseFlowgraph(vChanName)
log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName),
zap.String("state", watchInfo.State.String()), zap.Error(err))
return err
}
// etcd valid but the states updated.
if !success {
log.Info("handle put event: failed to compare version and swap, release flowgraph",
zap.String("key", key), zap.String("state", watchInfo.State.String()))
// flow graph will leak if not release, causing new datanode failed to subscribe
node.tryToReleaseFlowgraph(vChanName)
return nil
}
log.Info("handle put event successfully", zap.String("key", key), zap.String("state", watchInfo.State.String()))
return nil
}
func (node *DataNode) handleDeleteEvent(vChanName string) bool {
return node.tryToReleaseFlowgraph(vChanName)
func (node *DataNode) handleDeleteEvent(vChanName string) {
node.tryToReleaseFlowgraph(vChanName)
}
// tryToReleaseFlowgraph tries to release a flowgraph, returns false if failed
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) bool {
success := true
defer func() {
if x := recover(); x != nil {
log.Error("release flowgraph panic", zap.String("vChanName", vChanName), zap.Any("recovered", x))
success = false
}
}()
// tryToReleaseFlowgraph tries to release a flowgraph
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
node.flowgraphManager.release(vChanName)
log.Info("try to release flowgraph success", zap.String("vChanName", vChanName))
return success
}
// BackGroundGC runs in background to release datanode resources

View File

@ -757,10 +757,9 @@ func TestWatchChannel(t *testing.T) {
chPut <- struct{}{}
return r
},
func(vChan string) bool {
func(vChan string) {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
return true
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)
@ -795,10 +794,9 @@ func TestWatchChannel(t *testing.T) {
chPut <- struct{}{}
return r
},
func(vChan string) bool {
func(vChan string) {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
return true
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)

View File

@ -40,7 +40,7 @@ type channelEventManager struct {
eventChan chan event
closeChan chan struct{}
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent
handleDeleteEvent func(vChanName string) bool // node.handleDeleteEvent
handleDeleteEvent func(vChanName string) // node.handleDeleteEvent
retryInterval time.Duration
}
@ -50,7 +50,7 @@ const (
)
func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
handleDel func(string) bool, retryInterval time.Duration) *channelEventManager {
handleDel func(string), retryInterval time.Duration) *channelEventManager {
return &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{}),
@ -130,9 +130,11 @@ func (e *channelEventManager) retryHandlePutEvent(event event) {
}
err = e.handlePutEvent(event.info, event.version)
if err == nil {
log.Info("retry to handle put event successfully",
zap.String("vChanName", event.vChanName))
if err != nil {
log.Warn("failed to handle put event", zap.String("vChanName", event.vChanName), zap.Error(err))
// no need to retry here,
} else {
log.Info("handle put event successfully", zap.String("vChanName", event.vChanName))
return
}
}

View File

@ -34,7 +34,7 @@ func TestChannelEventManager(t *testing.T) {
ran = true
ch <- struct{}{}
return nil
}, func(name string) bool { return true }, time.Millisecond*10)
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
@ -56,7 +56,7 @@ func TestChannelEventManager(t *testing.T) {
ran = true
ch <- struct{}{}
return nil
}, func(name string) bool { return true }, time.Millisecond*10)
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
@ -89,7 +89,7 @@ func TestChannelEventManager(t *testing.T) {
}
return errors.New("mocked error")
}, func(name string) bool { return true }, time.Millisecond*10)
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
@ -107,7 +107,7 @@ func TestChannelEventManager(t *testing.T) {
t.Run("retry until timeout", func(t *testing.T) {
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) bool { return true }, time.Millisecond*100)
}, func(name string) {}, time.Millisecond*100)
ch := make(chan struct{}, 1)
@ -136,7 +136,7 @@ func TestChannelEventManager(t *testing.T) {
ch := make(chan struct{}, 1)
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) bool { return true }, time.Millisecond*10)
}, func(name string) {}, time.Millisecond*10)
go func() {
ddl := time.Now().Add(time.Minute)
@ -172,10 +172,9 @@ func TestChannelEventManager(t *testing.T) {
func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
},
func(name string) bool {
func(name string) {
ran = true
ch <- struct{}{}
return true
},
time.Millisecond*10,
)
@ -212,9 +211,7 @@ func TestChannelEventManager(t *testing.T) {
}
return errors.New("mocked error")
},
func(name string) bool {
return false
},
func(name string) {},
time.Millisecond*10)
em.Run()
em.handleEvent(event{
@ -253,9 +250,7 @@ func TestChannelEventManager(t *testing.T) {
func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
},
func(name string) bool {
return false
},
func(name string) {},
time.Millisecond*100,
)

View File

@ -42,8 +42,8 @@ import (
// revision: The number of times IndexMeta has been changed in etcd. It's the same as Event.Kv.Version in etcd.
// indexMeta: A structure that records the state of the index defined by proto.
type Meta struct {
indexMeta *indexpb.IndexMeta
revision int64
indexMeta *indexpb.IndexMeta
indexVersion int64
}
// metaTable records the mapping of IndexBuildID to Meta.
@ -91,8 +91,8 @@ func (mt *metaTable) reloadFromKV() error {
}
meta := &Meta{
indexMeta: &indexMeta,
revision: versions[i],
indexMeta: &indexMeta,
indexVersion: versions[i],
}
mt.indexBuildID2Meta[indexMeta.IndexBuildID] = *meta
}
@ -107,15 +107,19 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error {
return err
}
key := path.Join(indexFilePrefix, strconv.FormatInt(meta.indexMeta.IndexBuildID, 10))
err = mt.client.CompareVersionAndSwap(key, meta.revision, string(value))
log.Debug("IndexCoord metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err))
success, err := mt.client.CompareVersionAndSwap(key, meta.indexVersion, string(value))
if err != nil {
// TODO, we don't need to reload if it is just etcd error
log.Warn("failed to save index meta in etcd", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Error(err))
return err
}
meta.revision = meta.revision + 1
if !success {
log.Warn("failed to save index meta in etcd because version compare failure", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Any("index", meta.indexMeta))
return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", meta.indexMeta.IndexBuildID, meta.indexVersion)
}
meta.indexVersion = meta.indexVersion + 1
mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta
log.Debug("IndexCoord metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision))
log.Info("IndexCoord metaTable saveIndexMeta success", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Any("meta.revision", meta.indexVersion))
return nil
}
@ -141,8 +145,8 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
// return nil, nil
//}
m := &Meta{
revision: version[0],
indexMeta: im,
indexVersion: version[0],
indexMeta: im,
}
return m, nil
@ -165,7 +169,7 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ
NodeID: 0,
Version: 0,
},
revision: 0,
indexVersion: 0,
}
metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Inc()
return mt.saveIndexMeta(meta)
@ -298,8 +302,8 @@ func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) error {
for _, buildID := range buildIDs {
if meta, ok := mt.indexBuildID2Meta[buildID]; ok {
clonedMeta := &Meta{
indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta),
revision: meta.revision,
indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta),
indexVersion: meta.indexVersion,
}
clonedMeta.indexMeta.MarkDeleted = true
// marshal inside
@ -439,10 +443,10 @@ func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta {
var metas []Meta
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.State == commonpb.IndexState_Finished && (meta.indexMeta.MarkDeleted || !meta.indexMeta.Recycled) {
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion})
}
if meta.indexMeta.State == commonpb.IndexState_Unissued && meta.indexMeta.MarkDeleted {
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion})
}
if len(metas) >= limit {
return metas
@ -473,7 +477,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
continue
}
if meta.indexMeta.State == commonpb.IndexState_Unissued {
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion})
continue
}
if meta.indexMeta.State == commonpb.IndexState_Finished || meta.indexMeta.State == commonpb.IndexState_Failed {
@ -488,7 +492,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
}
if !alive {
log.Info("Reassign because node no longer alive", zap.Any("onlineID", onlineNodeIDs), zap.Int64("nodeID", meta.indexMeta.NodeID))
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion})
}
}
return sortMetaPolicy(metas)
@ -564,18 +568,18 @@ func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID)
// LoadMetaFromETCD load the meta of specified indexBuildID from ETCD.
// If the version of meta in memory is greater equal to the version in ETCD, no need to reload.
func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, indexVersion int64) bool {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
log.Debug("IndexCoord metaTable LoadMetaFromETCD", zap.Int64("indexBuildID", indexBuildID),
zap.Int64("revision", revision), zap.Bool("ok", ok))
zap.Int64("indexVersion", indexVersion), zap.Bool("ok", ok))
if ok {
log.Debug("IndexCoord metaTable LoadMetaFromETCD",
zap.Int64("meta.revision", meta.revision),
zap.Int64("revision", revision))
zap.Int64("meta.indexVersion", meta.indexVersion),
zap.Int64("indexVersion", indexVersion))
if meta.revision >= revision {
if meta.indexVersion >= indexVersion {
return false
}
} else {

View File

@ -70,8 +70,8 @@ func TestMetaTable(t *testing.T) {
t.Run("saveIndexMeta", func(t *testing.T) {
meta := &Meta{
indexMeta: indexMeta1,
revision: 10,
indexMeta: indexMeta1,
indexVersion: 10,
}
err = metaTable.saveIndexMeta(meta)
assert.NotNil(t, err)

View File

@ -19,6 +19,7 @@ package indexnode
import (
"context"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
@ -89,11 +90,15 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
string(metaData))
success, err := inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], string(metaData))
if err != nil {
// TODO, we don't need to reload if it is just etcd error
log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err))
return err
}
if !success {
return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0])
}
return nil
}
err := retry.Do(context.Background(), saveIndexMeta, retry.Attempts(3))
@ -117,11 +122,16 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
string(metaData))
success, err := inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], string(metaData))
if err != nil {
// TODO, we don't need to reload if it is just etcd error
log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err))
return err
}
if !success {
return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0])
}
indexMeta2 := indexpb.IndexMeta{}
_, values2, versions2, err := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
@ -139,11 +149,15 @@ func (inm *Mock) buildIndexTask() {
if err != nil {
return err
}
err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0],
string(metaData2))
success, err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0], string(metaData2))
if err != nil {
// TODO, we don't need to reload if it is just etcd error
log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err))
return err
}
if !success {
return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0])
}
return nil
}
err := retry.Do(context.Background(), saveIndexMeta, retry.Attempts(3))

View File

@ -196,8 +196,11 @@ func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta
return &indexMeta, source, nil
}
func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskState {
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta, err error) TaskState {
if err != nil {
log.Warn("IndexNode IndexBuildTask internal err, mark the task as retry", zap.Int64("buildID", it.req.IndexBuildID), zap.Error(err))
it.SetState(TaskStateRetry)
} else if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
it.SetState(TaskStateAbandon)
} else if indexMeta.MarkDeleted {
it.SetState(TaskStateDeleted)
@ -213,19 +216,15 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
fn := func() error {
indexMeta, version, err := it.loadIndexMeta(ctx)
if err != nil {
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID)
panic(errMsg)
log.Info("IndexNode IndexBuildTask saveIndexMeta fail to load index meta,", zap.Int64("build Id", indexMeta.IndexBuildID), zap.Error(err))
return err
}
if it.internalErr != nil {
log.Warn("IndexNode IndexBuildTask internal err is not nil, mark the task as retry",
zap.Int64("buildID", it.req.IndexBuildID))
it.SetState(TaskStateRetry)
}
taskState := it.updateTaskState(indexMeta)
if taskState == TaskStateDeleted {
taskState := it.updateTaskState(indexMeta, it.internalErr)
if taskState == TaskStateAbandon {
log.Warn("IndexNode IndexBuildTask saveIndexMeta success because task abandon", zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
return nil
} else if taskState == TaskStateDeleted {
log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
indexMeta.State = commonpb.IndexState_Finished
@ -252,14 +251,22 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
var metaValue []byte
metaValue, err = proto.Marshal(indexMeta)
if err != nil {
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s",
indexMeta.IndexBuildID, err.Error())
panic(errMsg)
log.Warn("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta,", zap.Int64("build Id", indexMeta.IndexBuildID), zap.Error(err))
return err
}
strMetaValue := string(metaValue)
return it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue)
success, err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue)
if err != nil {
// TODO, we don't need to reload if it is just etcd error
log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", it.req.IndexBuildID), zap.Error(err))
return err
}
if !success {
return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", it.req.IndexBuildID, version)
}
return nil
}
err := retry.Do(ctx, fn, retry.Attempts(3))
@ -279,7 +286,7 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
// assume that we can loadIndexMeta later...
return nil
}
it.updateTaskState(indexMeta)
it.updateTaskState(indexMeta, nil)
return nil
}

View File

@ -30,7 +30,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
kvi "github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
)
@ -564,7 +563,7 @@ func (kv *EmbedEtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKee
// CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error {
func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
@ -574,18 +573,14 @@ func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...cl
value)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return err
return false, err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwap error for compare is false for key: %s", key))
}
return nil
return resp.Succeeded, nil
}
// CompareValueAndSwapBytes compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error {
func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
@ -595,18 +590,14 @@ func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte
string(value))).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
return false, err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwapBytes error for compare is false for key: %s", key))
}
return nil
return resp.Succeeded, nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
@ -616,18 +607,14 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s
version)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return err
return false, err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key))
}
return nil
return resp.Succeeded, nil
}
// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) error {
func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
@ -637,13 +624,9 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, tar
version)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
return false, err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareVersionAndSwapBytes error for compare is false for key: %s", key))
}
return nil
return resp.Succeeded, nil
}
func (kv *EmbedEtcdKV) GetConfig() embed.Config {

View File

@ -17,12 +17,10 @@
package etcdkv_test
import (
"errors"
"os"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -810,24 +808,25 @@ func TestEmbedEtcd(te *testing.T) {
assert.Equal(t, revision+1, resp.Header.Revision)
}
var compareErr *kv.CompareFailedError
err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
success, err := metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.True(t, success)
value, err := metaKv.Load("a/b/c")
assert.NoError(t, err)
assert.Equal(t, value, "1")
err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.False(t, success)
err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
assert.True(t, success)
assert.NoError(t, err)
err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Revision Bytes", func(t *testing.T) {
@ -865,24 +864,25 @@ func TestEmbedEtcd(te *testing.T) {
assert.Equal(t, revision+1, resp.Header.Revision)
}
var compareErr *kv.CompareFailedError
err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
success, err := metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.True(t, success)
value, err := metaKv.LoadBytes("a/b/c")
assert.NoError(t, err)
assert.Equal(t, value, []byte("1"))
err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.False(t, success)
err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.True(t, success)
assert.NoError(t, err)
err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.NoError(t, err)
assert.False(t, success)
})

View File

@ -22,7 +22,6 @@ import (
"path"
"time"
kvi "github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
clientv3 "go.etcd.io/etcd/client/v3"
@ -589,7 +588,7 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv
// CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error {
func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
@ -600,18 +599,15 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv
value)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key))
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap")
return nil
return resp.Succeeded, nil
}
// CompareValueAndSwapBytes compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error {
func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
@ -622,18 +618,15 @@ func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opt
string(value))).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key))
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap")
return nil
return resp.Succeeded, nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) error {
func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
@ -644,19 +637,15 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string,
source)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+
" source version: %d, target version: %s", key, source, target))
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap")
return nil
return resp.Succeeded, nil
}
// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) error {
func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
@ -667,14 +656,10 @@ func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []
source)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+
" source version: %d, target version: %s", key, source, target))
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap")
return nil
return resp.Succeeded, nil
}
// CheckElapseAndWarn checks the elapsed time and warns if it is too long.

View File

@ -17,12 +17,10 @@
package etcdkv_test
import (
"errors"
"os"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -732,24 +730,25 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Equal(t, revision+1, resp.Header.Revision)
}
var compareErr *kv.CompareFailedError
err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
success, err := etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.True(t, success)
value, err := etcdKV.Load("a/b/c")
assert.NoError(t, err)
assert.Equal(t, value, "1")
err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.False(t, success)
err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
assert.True(t, success)
assert.NoError(t, err)
err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Revision Bytes", func(t *testing.T) {
@ -784,24 +783,25 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Equal(t, revision+1, resp.Header.Revision)
}
var compareErr *kv.CompareFailedError
err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
success, err := etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.True(t, success)
value, err := etcdKV.LoadBytes("a/b/c")
assert.NoError(t, err)
assert.Equal(t, string(value), "1")
err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.False(t, success)
err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.True(t, success)
assert.NoError(t, err)
err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.Error(t, err)
assert.True(t, errors.As(err, &compareErr))
success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Lease", func(t *testing.T) {

View File

@ -73,8 +73,8 @@ type MetaKv interface {
SaveWithIgnoreLease(key, value string) error
Grant(ttl int64) (id clientv3.LeaseID, err error)
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error)
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
}
// SnapShotKV is TxnKV for snapshot data. It must save timestamp.

View File

@ -147,10 +147,10 @@ func (m *MockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepA
panic("not implemented") // TODO: Implement
}
func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error {
func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
panic("not implemented") // TODO: Implement
}
func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
panic("not implemented") // TODO: Implement
}

View File

@ -125,11 +125,11 @@ func (m *mockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepA
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) error {
func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) (bool, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
panic("not implemented") // TODO: Implement
}