diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 0153ebc412..f626752871 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -79,8 +79,6 @@ const ( ConnectEtcdMaxRetryTime = 100 ) -const illegalRequestErrStr = "Illegal request" - // makes sure DataNode implements types.DataNode var _ types.DataNode = (*DataNode)(nil) @@ -382,11 +380,9 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version watchInfo.State = datapb.ChannelWatchState_WatchSuccess case datapb.ChannelWatchState_ToRelease: - if node.tryToReleaseFlowgraph(vChanName) { - watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess - } else { - watchInfo.State = datapb.ChannelWatchState_ReleaseFailure - } + // there is no reason why we release fail + node.tryToReleaseFlowgraph(vChanName) + watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess } v, err := proto.Marshal(watchInfo) @@ -394,32 +390,37 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err) } - k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", Params.DataNodeCfg.GetNodeID()), vChanName) + key := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", Params.DataNodeCfg.GetNodeID()), vChanName) - log.Debug("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String())) - err = node.watchKv.CompareVersionAndSwap(k, version, string(v)) + success, err := node.watchKv.CompareVersionAndSwap(key, version, string(v)) + // etcd error, retrying if err != nil { - return fmt.Errorf("fail to update watch state to etcd, vChanName: %s, state: %s, err: %w", vChanName, watchInfo.State.String(), err) + // flow graph will leak if not release, causing new datanode failed to subscribe + node.tryToReleaseFlowgraph(vChanName) + log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName), + zap.String("state", watchInfo.State.String()), zap.Error(err)) + return err } + // etcd valid but the states updated. + if !success { + log.Info("handle put event: failed to compare version and swap, release flowgraph", + zap.String("key", key), zap.String("state", watchInfo.State.String())) + // flow graph will leak if not release, causing new datanode failed to subscribe + node.tryToReleaseFlowgraph(vChanName) + return nil + } + log.Info("handle put event successfully", zap.String("key", key), zap.String("state", watchInfo.State.String())) return nil } -func (node *DataNode) handleDeleteEvent(vChanName string) bool { - return node.tryToReleaseFlowgraph(vChanName) +func (node *DataNode) handleDeleteEvent(vChanName string) { + node.tryToReleaseFlowgraph(vChanName) } -// tryToReleaseFlowgraph tries to release a flowgraph, returns false if failed -func (node *DataNode) tryToReleaseFlowgraph(vChanName string) bool { - success := true - defer func() { - if x := recover(); x != nil { - log.Error("release flowgraph panic", zap.String("vChanName", vChanName), zap.Any("recovered", x)) - success = false - } - }() +// tryToReleaseFlowgraph tries to release a flowgraph +func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { node.flowgraphManager.release(vChanName) log.Info("try to release flowgraph success", zap.String("vChanName", vChanName)) - return success } // BackGroundGC runs in background to release datanode resources diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 33e0e7bb12..eb3c093bc1 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -757,10 +757,9 @@ func TestWatchChannel(t *testing.T) { chPut <- struct{}{} return r }, - func(vChan string) bool { + func(vChan string) { node.handleDeleteEvent(vChan) chDel <- struct{}{} - return true }, time.Millisecond*100, ) node.eventManagerMap.Store(ch, m) @@ -795,10 +794,9 @@ func TestWatchChannel(t *testing.T) { chPut <- struct{}{} return r }, - func(vChan string) bool { + func(vChan string) { node.handleDeleteEvent(vChan) chDel <- struct{}{} - return true }, time.Millisecond*100, ) node.eventManagerMap.Store(ch, m) diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 2fe69451f2..9ad4ed41d2 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -40,7 +40,7 @@ type channelEventManager struct { eventChan chan event closeChan chan struct{} handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent - handleDeleteEvent func(vChanName string) bool // node.handleDeleteEvent + handleDeleteEvent func(vChanName string) // node.handleDeleteEvent retryInterval time.Duration } @@ -50,7 +50,7 @@ const ( ) func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error, - handleDel func(string) bool, retryInterval time.Duration) *channelEventManager { + handleDel func(string), retryInterval time.Duration) *channelEventManager { return &channelEventManager{ eventChan: make(chan event, 10), closeChan: make(chan struct{}), @@ -130,9 +130,11 @@ func (e *channelEventManager) retryHandlePutEvent(event event) { } err = e.handlePutEvent(event.info, event.version) - if err == nil { - log.Info("retry to handle put event successfully", - zap.String("vChanName", event.vChanName)) + if err != nil { + log.Warn("failed to handle put event", zap.String("vChanName", event.vChanName), zap.Error(err)) + // no need to retry here, + } else { + log.Info("handle put event successfully", zap.String("vChanName", event.vChanName)) return } } diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index a1fa323e4a..39f2907b17 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -34,7 +34,7 @@ func TestChannelEventManager(t *testing.T) { ran = true ch <- struct{}{} return nil - }, func(name string) bool { return true }, time.Millisecond*10) + }, func(name string) {}, time.Millisecond*10) em.Run() em.handleEvent(event{ @@ -56,7 +56,7 @@ func TestChannelEventManager(t *testing.T) { ran = true ch <- struct{}{} return nil - }, func(name string) bool { return true }, time.Millisecond*10) + }, func(name string) {}, time.Millisecond*10) em.Run() em.handleEvent(event{ @@ -89,7 +89,7 @@ func TestChannelEventManager(t *testing.T) { } return errors.New("mocked error") - }, func(name string) bool { return true }, time.Millisecond*10) + }, func(name string) {}, time.Millisecond*10) em.Run() em.handleEvent(event{ @@ -107,7 +107,7 @@ func TestChannelEventManager(t *testing.T) { t.Run("retry until timeout", func(t *testing.T) { em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { return errors.New("mocked error") - }, func(name string) bool { return true }, time.Millisecond*100) + }, func(name string) {}, time.Millisecond*100) ch := make(chan struct{}, 1) @@ -136,7 +136,7 @@ func TestChannelEventManager(t *testing.T) { ch := make(chan struct{}, 1) em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error { return errors.New("mocked error") - }, func(name string) bool { return true }, time.Millisecond*10) + }, func(name string) {}, time.Millisecond*10) go func() { ddl := time.Now().Add(time.Minute) @@ -172,10 +172,9 @@ func TestChannelEventManager(t *testing.T) { func(info *datapb.ChannelWatchInfo, version int64) error { return errors.New("mocked error") }, - func(name string) bool { + func(name string) { ran = true ch <- struct{}{} - return true }, time.Millisecond*10, ) @@ -212,9 +211,7 @@ func TestChannelEventManager(t *testing.T) { } return errors.New("mocked error") }, - func(name string) bool { - return false - }, + func(name string) {}, time.Millisecond*10) em.Run() em.handleEvent(event{ @@ -253,9 +250,7 @@ func TestChannelEventManager(t *testing.T) { func(info *datapb.ChannelWatchInfo, version int64) error { return errors.New("mocked error") }, - func(name string) bool { - return false - }, + func(name string) {}, time.Millisecond*100, ) diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index b68934095a..917bf77ef0 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -42,8 +42,8 @@ import ( // revision: The number of times IndexMeta has been changed in etcd. It's the same as Event.Kv.Version in etcd. // indexMeta: A structure that records the state of the index defined by proto. type Meta struct { - indexMeta *indexpb.IndexMeta - revision int64 + indexMeta *indexpb.IndexMeta + indexVersion int64 } // metaTable records the mapping of IndexBuildID to Meta. @@ -91,8 +91,8 @@ func (mt *metaTable) reloadFromKV() error { } meta := &Meta{ - indexMeta: &indexMeta, - revision: versions[i], + indexMeta: &indexMeta, + indexVersion: versions[i], } mt.indexBuildID2Meta[indexMeta.IndexBuildID] = *meta } @@ -107,15 +107,19 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error { return err } key := path.Join(indexFilePrefix, strconv.FormatInt(meta.indexMeta.IndexBuildID, 10)) - err = mt.client.CompareVersionAndSwap(key, meta.revision, string(value)) - log.Debug("IndexCoord metaTable saveIndexMeta ", zap.String("key", key), zap.Error(err)) + success, err := mt.client.CompareVersionAndSwap(key, meta.indexVersion, string(value)) if err != nil { + // TODO, we don't need to reload if it is just etcd error + log.Warn("failed to save index meta in etcd", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Error(err)) return err } - meta.revision = meta.revision + 1 + if !success { + log.Warn("failed to save index meta in etcd because version compare failure", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Any("index", meta.indexMeta)) + return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", meta.indexMeta.IndexBuildID, meta.indexVersion) + } + meta.indexVersion = meta.indexVersion + 1 mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta - log.Debug("IndexCoord metaTable saveIndexMeta success", zap.Any("meta.revision", meta.revision)) - + log.Info("IndexCoord metaTable saveIndexMeta success", zap.Int64("buildID", meta.indexMeta.IndexBuildID), zap.Any("meta.revision", meta.indexVersion)) return nil } @@ -141,8 +145,8 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) { // return nil, nil //} m := &Meta{ - revision: version[0], - indexMeta: im, + indexVersion: version[0], + indexMeta: im, } return m, nil @@ -165,7 +169,7 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ NodeID: 0, Version: 0, }, - revision: 0, + indexVersion: 0, } metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Inc() return mt.saveIndexMeta(meta) @@ -298,8 +302,8 @@ func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) error { for _, buildID := range buildIDs { if meta, ok := mt.indexBuildID2Meta[buildID]; ok { clonedMeta := &Meta{ - indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), - revision: meta.revision, + indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), + indexVersion: meta.indexVersion, } clonedMeta.indexMeta.MarkDeleted = true // marshal inside @@ -439,10 +443,10 @@ func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta { var metas []Meta for _, meta := range mt.indexBuildID2Meta { if meta.indexMeta.State == commonpb.IndexState_Finished && (meta.indexMeta.MarkDeleted || !meta.indexMeta.Recycled) { - metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) + metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion}) } if meta.indexMeta.State == commonpb.IndexState_Unissued && meta.indexMeta.MarkDeleted { - metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) + metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion}) } if len(metas) >= limit { return metas @@ -473,7 +477,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { continue } if meta.indexMeta.State == commonpb.IndexState_Unissued { - metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) + metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion}) continue } if meta.indexMeta.State == commonpb.IndexState_Finished || meta.indexMeta.State == commonpb.IndexState_Failed { @@ -488,7 +492,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { } if !alive { log.Info("Reassign because node no longer alive", zap.Any("onlineID", onlineNodeIDs), zap.Int64("nodeID", meta.indexMeta.NodeID)) - metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision}) + metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), indexVersion: meta.indexVersion}) } } return sortMetaPolicy(metas) @@ -564,18 +568,18 @@ func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) // LoadMetaFromETCD load the meta of specified indexBuildID from ETCD. // If the version of meta in memory is greater equal to the version in ETCD, no need to reload. -func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { +func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, indexVersion int64) bool { mt.lock.Lock() defer mt.lock.Unlock() meta, ok := mt.indexBuildID2Meta[indexBuildID] log.Debug("IndexCoord metaTable LoadMetaFromETCD", zap.Int64("indexBuildID", indexBuildID), - zap.Int64("revision", revision), zap.Bool("ok", ok)) + zap.Int64("indexVersion", indexVersion), zap.Bool("ok", ok)) if ok { log.Debug("IndexCoord metaTable LoadMetaFromETCD", - zap.Int64("meta.revision", meta.revision), - zap.Int64("revision", revision)) + zap.Int64("meta.indexVersion", meta.indexVersion), + zap.Int64("indexVersion", indexVersion)) - if meta.revision >= revision { + if meta.indexVersion >= indexVersion { return false } } else { diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index 2d35229dbc..15cc34644e 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -70,8 +70,8 @@ func TestMetaTable(t *testing.T) { t.Run("saveIndexMeta", func(t *testing.T) { meta := &Meta{ - indexMeta: indexMeta1, - revision: 10, + indexMeta: indexMeta1, + indexVersion: 10, } err = metaTable.saveIndexMeta(meta) assert.NotNil(t, err) diff --git a/internal/indexnode/indexnode_mock.go b/internal/indexnode/indexnode_mock.go index f902fa0fbd..2fa1f264c0 100644 --- a/internal/indexnode/indexnode_mock.go +++ b/internal/indexnode/indexnode_mock.go @@ -19,6 +19,7 @@ package indexnode import ( "context" "errors" + "fmt" "sync" "go.uber.org/zap" @@ -89,11 +90,15 @@ func (inm *Mock) buildIndexTask() { if err != nil { return err } - err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], - string(metaData)) + success, err := inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], string(metaData)) if err != nil { + // TODO, we don't need to reload if it is just etcd error + log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err)) return err } + if !success { + return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0]) + } return nil } err := retry.Do(context.Background(), saveIndexMeta, retry.Attempts(3)) @@ -117,11 +122,16 @@ func (inm *Mock) buildIndexTask() { if err != nil { return err } - err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], - string(metaData)) + + success, err := inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0], string(metaData)) if err != nil { + // TODO, we don't need to reload if it is just etcd error + log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err)) return err } + if !success { + return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0]) + } indexMeta2 := indexpb.IndexMeta{} _, values2, versions2, err := inm.etcdKV.LoadWithPrefix2(req.MetaPath) @@ -139,11 +149,15 @@ func (inm *Mock) buildIndexTask() { if err != nil { return err } - err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0], - string(metaData2)) + success, err = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions2[0], string(metaData2)) if err != nil { + // TODO, we don't need to reload if it is just etcd error + log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", req.IndexBuildID), zap.Error(err)) return err } + if !success { + return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", req.IndexBuildID, versions[0]) + } return nil } err := retry.Do(context.Background(), saveIndexMeta, retry.Attempts(3)) diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 675cbaef85..b9520873aa 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -196,8 +196,11 @@ func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta return &indexMeta, source, nil } -func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskState { - if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { +func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta, err error) TaskState { + if err != nil { + log.Warn("IndexNode IndexBuildTask internal err, mark the task as retry", zap.Int64("buildID", it.req.IndexBuildID), zap.Error(err)) + it.SetState(TaskStateRetry) + } else if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { it.SetState(TaskStateAbandon) } else if indexMeta.MarkDeleted { it.SetState(TaskStateDeleted) @@ -213,19 +216,15 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error { fn := func() error { indexMeta, version, err := it.loadIndexMeta(ctx) if err != nil { - errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID) - panic(errMsg) + log.Info("IndexNode IndexBuildTask saveIndexMeta fail to load index meta,", zap.Int64("build Id", indexMeta.IndexBuildID), zap.Error(err)) + return err } - - if it.internalErr != nil { - log.Warn("IndexNode IndexBuildTask internal err is not nil, mark the task as retry", - zap.Int64("buildID", it.req.IndexBuildID)) - it.SetState(TaskStateRetry) - } - - taskState := it.updateTaskState(indexMeta) - - if taskState == TaskStateDeleted { + taskState := it.updateTaskState(indexMeta, it.internalErr) + if taskState == TaskStateAbandon { + log.Warn("IndexNode IndexBuildTask saveIndexMeta success because task abandon", zap.String("TaskState", taskState.String()), + zap.Int64("IndexBuildID", indexMeta.IndexBuildID)) + return nil + } else if taskState == TaskStateDeleted { log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()), zap.Int64("IndexBuildID", indexMeta.IndexBuildID)) indexMeta.State = commonpb.IndexState_Finished @@ -252,14 +251,22 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error { var metaValue []byte metaValue, err = proto.Marshal(indexMeta) if err != nil { - errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s", - indexMeta.IndexBuildID, err.Error()) - panic(errMsg) + log.Warn("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta,", zap.Int64("build Id", indexMeta.IndexBuildID), zap.Error(err)) + return err } strMetaValue := string(metaValue) - return it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue) + success, err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue) + if err != nil { + // TODO, we don't need to reload if it is just etcd error + log.Warn("failed to compare and swap in etcd", zap.Int64("buildID", it.req.IndexBuildID), zap.Error(err)) + return err + } + if !success { + return fmt.Errorf("failed to save index meta in etcd, buildId: %d, source version: %d", it.req.IndexBuildID, version) + } + return nil } err := retry.Do(ctx, fn, retry.Attempts(3)) @@ -279,7 +286,7 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error { // assume that we can loadIndexMeta later... return nil } - it.updateTaskState(indexMeta) + it.updateTaskState(indexMeta, nil) return nil } diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 08bfb112f8..613564caa2 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -30,7 +30,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" - kvi "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" ) @@ -564,7 +563,7 @@ func (kv *EmbedEtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKee // CompareValueAndSwap compares the existing value with compare, and if they are // equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error { +func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -574,18 +573,14 @@ func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...cl value)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { - return err + return false, err } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwap error for compare is false for key: %s", key)) - } - - return nil + return resp.Succeeded, nil } // CompareValueAndSwapBytes compares the existing value with compare, and if they are // equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error { +func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -595,18 +590,14 @@ func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte string(value))). Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() if err != nil { - return err + return false, err } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareValueAndSwapBytes error for compare is false for key: %s", key)) - } - - return nil + return resp.Succeeded, nil } // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error { +func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -616,18 +607,14 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s version)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { - return err + return false, err } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) - } - - return nil + return resp.Succeeded, nil } // CompareVersionAndSwapBytes compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. -func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) error { +func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) (bool, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -637,13 +624,9 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, tar version)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() if err != nil { - return err + return false, err } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareVersionAndSwapBytes error for compare is false for key: %s", key)) - } - - return nil + return resp.Succeeded, nil } func (kv *EmbedEtcdKV) GetConfig() embed.Config { diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 705a4e9b6e..9fe09e569e 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -17,12 +17,10 @@ package etcdkv_test import ( - "errors" "os" "testing" "time" - "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/util/metricsinfo" embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -810,24 +808,25 @@ func TestEmbedEtcd(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } - var compareErr *kv.CompareFailedError - err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") + success, err := metaKv.CompareVersionAndSwap("a/b/c", 0, "1") assert.NoError(t, err) + assert.True(t, success) value, err := metaKv.Load("a/b/c") assert.NoError(t, err) assert.Equal(t, value, "1") - err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1") + assert.NoError(t, err) + assert.False(t, success) - err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") + success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") + assert.True(t, success) assert.NoError(t, err) - err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2") + assert.NoError(t, err) + assert.False(t, success) }) te.Run("Etcd Revision Bytes", func(t *testing.T) { @@ -865,24 +864,25 @@ func TestEmbedEtcd(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } - var compareErr *kv.CompareFailedError - err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + success, err := metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) + assert.True(t, success) value, err := metaKv.LoadBytes("a/b/c") assert.NoError(t, err) assert.Equal(t, value, []byte("1")) - err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.NoError(t, err) + assert.False(t, success) - err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.True(t, success) assert.NoError(t, err) - err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.NoError(t, err) + assert.False(t, success) }) diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index e7abd0ef74..084574217b 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -22,7 +22,6 @@ import ( "path" "time" - kvi "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" clientv3 "go.etcd.io/etcd/client/v3" @@ -589,7 +588,7 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv // CompareValueAndSwap compares the existing value with compare, and if they are // equal, the target is stored in etcd. -func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error { +func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -600,18 +599,15 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv value)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { - return err - } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) + return false, err } CheckElapseAndWarn(start, "Slow etcd operation compare value and swap") - return nil + return resp.Succeeded, nil } // CompareValueAndSwapBytes compares the existing value with compare, and if they are // equal, the target is stored in etcd. -func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error { +func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -622,18 +618,15 @@ func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opt string(value))). Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() if err != nil { - return err - } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)) + return false, err } CheckElapseAndWarn(start, "Slow etcd operation compare value and swap") - return nil + return resp.Succeeded, nil } // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. -func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) error { +func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -644,19 +637,15 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, source)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { - return err - } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ - " source version: %d, target version: %s", key, source, target)) + return false, err } CheckElapseAndWarn(start, "Slow etcd operation compare version and swap") - return nil + return resp.Succeeded, nil } // CompareVersionAndSwapBytes compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. -func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) error { +func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) (bool, error) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() @@ -667,14 +656,10 @@ func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target [] source)). Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() if err != nil { - return err - } - if !resp.Succeeded { - return kvi.NewCompareFailedError(fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ - " source version: %d, target version: %s", key, source, target)) + return false, err } CheckElapseAndWarn(start, "Slow etcd operation compare version and swap") - return nil + return resp.Succeeded, nil } // CheckElapseAndWarn checks the elapsed time and warns if it is too long. diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 7e939ab5b3..aa8d6fb4e0 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -17,12 +17,10 @@ package etcdkv_test import ( - "errors" "os" "testing" "time" - "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -732,24 +730,25 @@ func TestEtcdKV_Load(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } - var compareErr *kv.CompareFailedError - err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") + success, err := etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") assert.NoError(t, err) + assert.True(t, success) value, err := etcdKV.Load("a/b/c") assert.NoError(t, err) assert.Equal(t, value, "1") - err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1") + assert.NoError(t, err) + assert.False(t, success) - err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") + success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") + assert.True(t, success) assert.NoError(t, err) - err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2") + assert.NoError(t, err) + assert.False(t, success) }) te.Run("Etcd Revision Bytes", func(t *testing.T) { @@ -784,24 +783,25 @@ func TestEtcdKV_Load(te *testing.T) { assert.Equal(t, revision+1, resp.Header.Revision) } - var compareErr *kv.CompareFailedError - err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + success, err := etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) assert.NoError(t, err) + assert.True(t, success) value, err := etcdKV.LoadBytes("a/b/c") assert.NoError(t, err) assert.Equal(t, string(value), "1") - err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.NoError(t, err) + assert.False(t, success) - err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.True(t, success) assert.NoError(t, err) - err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) - assert.Error(t, err) - assert.True(t, errors.As(err, &compareErr)) + success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.NoError(t, err) + assert.False(t, success) }) te.Run("Etcd Lease", func(t *testing.T) { diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 524ffc1a54..e3e360c499 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -73,8 +73,8 @@ type MetaKv interface { SaveWithIgnoreLease(key, value string) error Grant(ttl int64) (id clientv3.LeaseID, err error) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) - CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error - CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error + CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) + CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) } // SnapShotKV is TxnKV for snapshot data. It must save timestamp. diff --git a/internal/kv/mock_kv.go b/internal/kv/mock_kv.go index 505e1ddbd0..90680c424c 100644 --- a/internal/kv/mock_kv.go +++ b/internal/kv/mock_kv.go @@ -147,10 +147,10 @@ func (m *MockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepA panic("not implemented") // TODO: Implement } -func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error { +func (m *MockMetaKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) { panic("not implemented") // TODO: Implement } -func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error { +func (m *MockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/querycoord/replica_test.go b/internal/querycoord/replica_test.go index 49ff333101..412d15de9e 100644 --- a/internal/querycoord/replica_test.go +++ b/internal/querycoord/replica_test.go @@ -125,11 +125,11 @@ func (m *mockMetaKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepA panic("not implemented") // TODO: Implement } -func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) error { +func (m *mockMetaKV) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) (bool, error) { panic("not implemented") // TODO: Implement } -func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error { +func (m *mockMetaKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { panic("not implemented") // TODO: Implement }