mirror of https://github.com/milvus-io/milvus.git
Change etcdkv clientv3 into MetaKv interface (#10903)
This pr: - changed etcdkv clientv3 into MetaKv interface - replaced fmt.Sprintf with path.Join for kv key See also: #8058 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/10826/head
parent
881e547802
commit
1e3fc5076f
|
@ -25,6 +25,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/logutil"
|
||||
|
@ -105,8 +107,8 @@ type DataNode struct {
|
|||
rootCoord types.RootCoord
|
||||
dataCoord types.DataCoord
|
||||
|
||||
session *sessionutil.Session
|
||||
kvClient *etcdkv.EtcdKV
|
||||
session *sessionutil.Session
|
||||
watchKv kv.MetaKv
|
||||
|
||||
closer io.Closer
|
||||
|
||||
|
@ -189,7 +191,7 @@ func (node *DataNode) Register() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Init function do nothing now.
|
||||
// Init function does nothing now.
|
||||
func (node *DataNode) Init() error {
|
||||
log.Debug("DataNode Init",
|
||||
zap.String("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName),
|
||||
|
@ -203,8 +205,8 @@ func (node *DataNode) Init() error {
|
|||
func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
||||
defer logutil.LogPanic()
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
watchPrefix := fmt.Sprintf("%s/%d", Params.ChannelWatchSubPath, node.NodeID)
|
||||
evtChan := node.kvClient.WatchWithPrefix(watchPrefix)
|
||||
watchPrefix := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID))
|
||||
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
|
||||
// after watch, first check all exists nodes first
|
||||
err := node.checkWatchedList()
|
||||
if err != nil {
|
||||
|
@ -243,8 +245,8 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
|||
// serves the corner case for etcd connection lost and missing some events
|
||||
func (node *DataNode) checkWatchedList() error {
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
prefix := fmt.Sprintf("%s/%d", Params.ChannelWatchSubPath, node.NodeID)
|
||||
keys, values, err := node.kvClient.LoadWithPrefix(prefix)
|
||||
prefix := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID))
|
||||
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -291,7 +293,8 @@ func (node *DataNode) handleWatchInfo(key string, data []byte) {
|
|||
log.Warn("fail to Marshal watchInfo", zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
err = node.kvClient.Save(fmt.Sprintf("%s/%d/%s", Params.ChannelWatchSubPath, node.NodeID, watchInfo.Vchan.ChannelName), string(v))
|
||||
k := path.Join(Params.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), watchInfo.GetVchan().GetChannelName())
|
||||
err = node.watchKv.Save(k, string(v))
|
||||
if err != nil {
|
||||
log.Warn("fail to change WatchState to complete", zap.String("key", key), zap.Error(err))
|
||||
node.ReleaseDataSyncService(key)
|
||||
|
@ -395,7 +398,7 @@ func (node *DataNode) Start() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
node.kvClient = etcdKV
|
||||
node.watchKv = etcdKV
|
||||
return nil
|
||||
}
|
||||
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
|
||||
|
|
Loading…
Reference in New Issue