// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datacoord import ( "context" "fmt" "sync" "time" "github.com/samber/lo" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/logutil" ) type ChannelManager interface { Startup(ctx context.Context, nodes []int64) error Close() AddNode(nodeID int64) error DeleteNode(nodeID int64) error Watch(ctx context.Context, ch RWChannel) error RemoveChannel(channelName string) error Release(nodeID UniqueID, channelName string) error Match(nodeID int64, channel string) bool FindWatcher(channel string) (int64, error) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string GetChannelsByCollectionID(collectionID UniqueID) []RWChannel GetCollectionIDByChannel(channel string) (bool, UniqueID) GetNodeIDByChannelName(channel string) (bool, UniqueID) } // ChannelManagerImpl manages the allocation and the balance between channels and data nodes. type ChannelManagerImpl struct { ctx context.Context mu sync.RWMutex h Handler store RWChannelStore factory ChannelPolicyFactory registerPolicy RegisterPolicy deregisterPolicy DeregisterPolicy assignPolicy ChannelAssignPolicy reassignPolicy ChannelReassignPolicy bgChecker ChannelBGChecker balancePolicy BalanceChannelPolicy msgstreamFactory msgstream.Factory stateChecker channelStateChecker stopChecker context.CancelFunc stateTimer *channelStateTimer lastActiveTimestamp time.Time } // ChannelManagerOpt is to set optional parameters in channel manager. type ChannelManagerOpt func(c *ChannelManagerImpl) func withFactory(f ChannelPolicyFactory) ChannelManagerOpt { return func(c *ChannelManagerImpl) { c.factory = f } } func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt { return func(c *ChannelManagerImpl) { c.msgstreamFactory = f } } func withStateChecker() ChannelManagerOpt { return func(c *ChannelManagerImpl) { c.stateChecker = c.watchChannelStatesLoop } } func withBgChecker() ChannelManagerOpt { return func(c *ChannelManagerImpl) { c.bgChecker = c.bgCheckChannelsWork } } // NewChannelManager creates and returns a new ChannelManager instance. func NewChannelManager( kv kv.WatchKV, // for TxnKv, MetaKv and WatchKV h Handler, options ...ChannelManagerOpt, ) (*ChannelManagerImpl, error) { c := &ChannelManagerImpl{ ctx: context.TODO(), h: h, factory: NewChannelPolicyFactoryV1(kv), store: NewChannelStore(kv), stateTimer: newChannelStateTimer(kv), } if err := c.store.Reload(); err != nil { return nil, err } for _, opt := range options { opt(c) } c.registerPolicy = c.factory.NewRegisterPolicy() c.deregisterPolicy = c.factory.NewDeregisterPolicy() c.assignPolicy = c.factory.NewAssignPolicy() c.reassignPolicy = c.factory.NewReassignPolicy() c.balancePolicy = c.factory.NewBalancePolicy() c.lastActiveTimestamp = time.Now() return c, nil } // Startup adjusts the channel store according to current cluster states. func (c *ChannelManagerImpl) Startup(ctx context.Context, nodes []int64) error { c.ctx = ctx channels := c.store.GetNodesChannels() // Retrieve the current old nodes. oNodes := make([]int64, 0, len(channels)) for _, c := range channels { oNodes = append(oNodes, c.NodeID) } // Process watch states for old nodes. oldOnLines := c.getOldOnlines(nodes, oNodes) if err := c.checkOldNodes(oldOnLines); err != nil { return err } // Add new online nodes to the cluster. newOnLines := c.getNewOnLines(nodes, oNodes) for _, n := range newOnLines { if err := c.AddNode(n); err != nil { return err } } // Remove new offline nodes from the cluster. offLines := c.getOffLines(nodes, oNodes) for _, n := range offLines { if err := c.DeleteNode(n); err != nil { return err } } // Unwatch and drop channel with drop flag. c.unwatchDroppedChannels() checkerContext, cancel := context.WithCancel(ctx) c.stopChecker = cancel if c.stateChecker != nil { // TODO get revision from reload logic go c.stateChecker(checkerContext, common.LatestRevision) log.Info("starting etcd states checker") } if c.bgChecker != nil { go c.bgChecker(checkerContext) log.Info("starting background balance checker") } log.Info("cluster start up", zap.Int64s("nodes", nodes), zap.Int64s("oNodes", oNodes), zap.Int64s("old onlines", oldOnLines), zap.Int64s("new onlines", newOnLines), zap.Int64s("offLines", offLines)) return nil } // Close notifies the running checker. func (c *ChannelManagerImpl) Close() { if c.stopChecker != nil { c.stopChecker() } } // checkOldNodes processes the existing watch channels when starting up. // ToWatch get startTs and timeoutTs, start timer // WatchSuccess ignore // WatchFail ToRelease // ToRelase get startTs and timeoutTs, start timer // ReleaseSuccess remove // ReleaseFail clean up and remove func (c *ChannelManagerImpl) checkOldNodes(nodes []UniqueID) error { // Load all the watch infos before processing nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo) for _, nodeID := range nodes { watchInfos, err := c.stateTimer.loadAllChannels(nodeID) if err != nil { return err } nodeWatchInfos[nodeID] = watchInfos } for nodeID, watchInfos := range nodeWatchInfos { for _, info := range watchInfos { channelName := info.GetVchan().GetChannelName() checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second) log.Info("processing watch info", zap.String("watch state", info.GetState().String()), zap.String("channelName", channelName)) switch info.GetState() { case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, checkInterval) case datapb.ChannelWatchState_WatchFailure: if err := c.Release(nodeID, channelName); err != nil { return err } case datapb.ChannelWatchState_ToRelease: c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, checkInterval) case datapb.ChannelWatchState_ReleaseSuccess: if err := c.Reassign(nodeID, channelName); err != nil { return err } case datapb.ChannelWatchState_ReleaseFailure: if err := c.CleanupAndReassign(nodeID, channelName); err != nil { return err } } } } return nil } // unwatchDroppedChannels removes drops channel that are marked to drop. func (c *ChannelManagerImpl) unwatchDroppedChannels() { nodeChannels := c.store.GetChannels() for _, nodeChannel := range nodeChannels { for _, ch := range nodeChannel.Channels { if !c.isMarkedDrop(ch.GetName()) { continue } err := c.remove(nodeChannel.NodeID, ch) if err != nil { log.Warn("unable to remove channel", zap.String("channel", ch.GetName()), zap.Error(err)) continue } err = c.h.FinishDropChannel(ch.GetName()) if err != nil { log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.GetName()), zap.Error(err)) } } } } func (c *ChannelManagerImpl) bgCheckChannelsWork(ctx context.Context) { ticker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { case <-ctx.Done(): log.Info("background checking channels loop quit") return case <-ticker.C: if !Params.DataCoordCfg.AutoBalance.GetAsBool() { log.Info("auto balance disabled, skip auto bg check balance") continue } c.mu.Lock() if !c.isSilent() { log.Info("ChannelManager is not silent, skip channel balance this round") } else { toReleases := c.balancePolicy(c.store, time.Now()) log.Info("channel manager bg check balance", zap.Array("toReleases", toReleases)) if err := c.updateWithTimer(toReleases, datapb.ChannelWatchState_ToRelease); err != nil { log.Warn("channel store update error", zap.Error(err)) } } c.mu.Unlock() } } } // getOldOnlines returns a list of old online node ids in `old` and in `curr`. func (c *ChannelManagerImpl) getOldOnlines(curr []int64, old []int64) []int64 { mcurr := make(map[int64]struct{}) ret := make([]int64, 0, len(old)) for _, n := range curr { mcurr[n] = struct{}{} } for _, n := range old { if _, found := mcurr[n]; found { ret = append(ret, n) } } return ret } // getNewOnLines returns a list of new online node ids in `curr` but not in `old`. func (c *ChannelManagerImpl) getNewOnLines(curr []int64, old []int64) []int64 { mold := make(map[int64]struct{}) ret := make([]int64, 0, len(curr)) for _, n := range old { mold[n] = struct{}{} } for _, n := range curr { if _, found := mold[n]; !found { ret = append(ret, n) } } return ret } // getOffLines returns a list of new offline node ids in `old` but not in `curr`. func (c *ChannelManagerImpl) getOffLines(curr []int64, old []int64) []int64 { mcurr := make(map[int64]struct{}) ret := make([]int64, 0, len(old)) for _, n := range curr { mcurr[n] = struct{}{} } for _, n := range old { if _, found := mcurr[n]; !found { ret = append(ret, n) } } return ret } // AddNode adds a new node to cluster and reassigns the node - channel mapping. func (c *ChannelManagerImpl) AddNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() c.store.Add(nodeID) bufferedUpdates, balanceUpdates := c.registerPolicy(c.store, nodeID) updates := bufferedUpdates // try bufferedUpdates first if updates == nil { if !Params.DataCoordCfg.AutoBalance.GetAsBool() { log.Info("auto balance disabled, skip reassignment for balance", zap.Int64("registered node", nodeID)) return nil } updates = balanceUpdates } if updates == nil { log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) return nil } log.Info("register node", zap.Int64("registered node", nodeID), zap.Array("updates", updates)) state := datapb.ChannelWatchState_ToRelease for _, u := range updates.Collect() { if u.Type == Delete && u.NodeID == bufferID { state = datapb.ChannelWatchState_ToWatch break } } return c.updateWithTimer(updates, state) } // DeleteNode deletes the node from the cluster. // DeleteNode deletes the nodeID's watchInfos in Etcd and reassign the channels to other Nodes func (c *ChannelManagerImpl) DeleteNode(nodeID int64) error { c.mu.Lock() defer c.mu.Unlock() nodeChannelInfo := c.store.GetNode(nodeID) if nodeChannelInfo == nil { return nil } c.unsubAttempt(nodeChannelInfo) updates := c.deregisterPolicy(c.store, nodeID) if updates == nil { return nil } log.Info("deregister node", zap.Int64("nodeID", nodeID), zap.Array("updates", updates)) var channels []RWChannel for _, op := range updates.Collect() { if op.Type == Delete { channels = op.Channels } } chNames := make([]string, 0, len(channels)) for _, ch := range channels { chNames = append(chNames, ch.GetName()) } log.Info("remove timers for channel of the deregistered node", zap.Strings("channels", chNames), zap.Int64("nodeID", nodeID)) c.stateTimer.removeTimers(chNames) if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil { return err } // No channels will be return _, err := c.store.Delete(nodeID) return err } // unsubAttempt attempts to unsubscribe node-channel info from the channel. func (c *ChannelManagerImpl) unsubAttempt(ncInfo *NodeChannelInfo) { if ncInfo == nil { return } if c.msgstreamFactory == nil { log.Warn("msgstream factory is not set") return } nodeID := ncInfo.NodeID for _, ch := range ncInfo.Channels { // align to datanode subname, using vchannel name subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, ch.GetName()) pchannelName := funcutil.ToPhysicalChannel(ch.GetName()) msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } } // Watch tries to add the channel to cluster. Watch is a no op if the channel already exists. func (c *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error { log := log.Ctx(ctx) c.mu.Lock() defer c.mu.Unlock() updates := c.assignPolicy(c.store, []RWChannel{ch}) if updates == nil { return nil } log.Info("try to update channel watch info with ToWatch state", zap.String("channel", ch.String()), zap.Array("updates", updates)) err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) if err != nil { log.Warn("fail to update channel watch info with ToWatch state", zap.String("channel", ch.String()), zap.Array("updates", updates), zap.Error(err)) } return err } // fillChannelWatchInfoWithState updates the channel op by filling in channel watch info. func (c *ChannelManagerImpl) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string { channelsWithTimer := []string{} startTs := time.Now().Unix() checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second) for _, ch := range op.Channels { vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID) info := &datapb.ChannelWatchInfo{ Vchan: vcInfo, StartTs: startTs, State: state, Schema: ch.GetSchema(), } // Only set timer for watchInfo not from bufferID if op.NodeID != bufferID { c.stateTimer.startOne(state, ch.GetName(), op.NodeID, checkInterval) channelsWithTimer = append(channelsWithTimer, ch.GetName()) } ch.UpdateWatchInfo(info) } return channelsWithTimer } // GetAssignedChannels gets channels info of registered nodes. func (c *ChannelManagerImpl) GetAssignedChannels() []*NodeChannelInfo { c.mu.RLock() defer c.mu.RUnlock() return c.store.GetNodesChannels() } // GetBufferChannels gets buffer channels. func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo { c.mu.RLock() defer c.mu.RUnlock() return c.store.GetBufferChannelInfo() } // GetNodeChannelsByCollectionID gets all node channels map of the collection func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string { nodeChs := make(map[UniqueID][]string) for _, nodeChannels := range c.GetAssignedChannels() { filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { return channel.GetCollectionID() == collectionID }) channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string { return channel.GetName() }) nodeChs[nodeChannels.NodeID] = channelNames } return nodeChs } // Get all channels belong to the collection func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel { channels := make([]RWChannel, 0) for _, nodeChannels := range c.GetAssignedChannels() { filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool { return channel.GetCollectionID() == collectionID }) channels = append(channels, filtered...) } return channels } // Get all channel names belong to the collection func (c *ChannelManagerImpl) GetChannelNamesByCollectionID(collectionID UniqueID) []string { channels := c.GetChannelsByCollectionID(collectionID) return lo.Map(channels, func(channel RWChannel, _ int) string { return channel.GetName() }) } // Match checks and returns whether the node ID and channel match. // use vchannel func (c *ChannelManagerImpl) Match(nodeID int64, channel string) bool { c.mu.RLock() defer c.mu.RUnlock() info := c.store.GetNode(nodeID) if info == nil { return false } for _, ch := range info.Channels { if ch.GetName() == channel { return true } } return false } // FindWatcher finds the datanode watching the provided channel. func (c *ChannelManagerImpl) FindWatcher(channel string) (int64, error) { c.mu.RLock() defer c.mu.RUnlock() infos := c.store.GetNodesChannels() for _, info := range infos { for _, channelInfo := range info.Channels { if channelInfo.GetName() == channel { return info.NodeID, nil } } } // channel in buffer bufferInfo := c.store.GetBufferChannelInfo() for _, channelInfo := range bufferInfo.Channels { if channelInfo.GetName() == channel { return bufferID, errChannelInBuffer } } return 0, errChannelNotWatched } // RemoveChannel removes the channel from channel manager. func (c *ChannelManagerImpl) RemoveChannel(channelName string) error { c.mu.Lock() defer c.mu.Unlock() nodeID, ch := c.findChannel(channelName) if ch == nil { return nil } return c.remove(nodeID, ch) } // remove deletes the nodeID-channel pair from data store. func (c *ChannelManagerImpl) remove(nodeID int64, ch RWChannel) error { op := NewChannelOpSet(NewDeleteOp(nodeID, ch)) log.Info("remove channel assignment", zap.Int64("nodeID to be removed", nodeID), zap.String("channel", ch.GetName()), zap.Int64("collectionID", ch.GetCollectionID())) if err := c.store.Update(op); err != nil { return err } return nil } func (c *ChannelManagerImpl) findChannel(channelName string) (int64, RWChannel) { infos := c.store.GetNodesChannels() for _, info := range infos { for _, channelInfo := range info.Channels { if channelInfo.GetName() == channelName { return info.NodeID, channelInfo } } } return 0, nil } type ackType = int const ( invalidAck = iota watchSuccessAck watchFailAck watchTimeoutAck releaseSuccessAck releaseFailAck releaseTimeoutAck ) type ackEvent struct { ackType ackType channelName string nodeID UniqueID } func (c *ChannelManagerImpl) updateWithTimer(updates *ChannelOpSet, state datapb.ChannelWatchState) error { channelsWithTimer := []string{} for _, op := range updates.Collect() { if op.Type == Add { channelsWithTimer = append(channelsWithTimer, c.fillChannelWatchInfoWithState(op, state)...) } } err := c.store.Update(updates) if err != nil { log.Warn("fail to update", zap.Array("updates", updates), zap.Error(err)) c.stateTimer.removeTimers(channelsWithTimer) } c.lastActiveTimestamp = time.Now() return err } func (c *ChannelManagerImpl) processAck(e *ackEvent) { c.stateTimer.stopIfExist(e) switch e.ackType { case invalidAck: log.Warn("detected invalid Ack", zap.String("channelName", e.channelName)) case watchSuccessAck: log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName)) case watchFailAck, watchTimeoutAck: // failure acks from toWatch log.Warn("datanode watch channel failed or timeout, will release", zap.Int64("nodeID", e.nodeID), zap.String("channel", e.channelName)) err := c.Release(e.nodeID, e.channelName) if err != nil { log.Warn("fail to set channels to release for watch failure ACKs", zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err)) } case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease // Cleanup, Delete and Reassign log.Warn("datanode release channel failed or timeout, will cleanup and reassign", zap.Int64("nodeID", e.nodeID), zap.String("channel", e.channelName)) err := c.CleanupAndReassign(e.nodeID, e.channelName) if err != nil { log.Warn("fail to clean and reassign channels for release failure ACKs", zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err)) } case releaseSuccessAck: // Delete and Reassign log.Info("datanode release channel successfully, will reassign", zap.Int64("nodeID", e.nodeID), zap.String("channel", e.channelName)) err := c.Reassign(e.nodeID, e.channelName) if err != nil { log.Warn("fail to response to release success ACK", zap.Int64("nodeID", e.nodeID), zap.String("channelName", e.channelName), zap.Error(err)) } } } type channelStateChecker func(context.Context, int64) func (c *ChannelManagerImpl) watchChannelStatesLoop(ctx context.Context, revision int64) { defer logutil.LogPanic() // REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name} watchPrefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue() // TODO, this is risky, we'd better watch etcd with revision rather simply a path var etcdWatcher clientv3.WatchChan var timeoutWatcher chan *ackEvent if revision == common.LatestRevision { etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix) } else { etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision) } for { select { case <-ctx.Done(): log.Info("watch etcd loop quit") return case ackEvent := <-timeoutWatcher: log.Info("receive timeout acks from state watcher", zap.Int("state", ackEvent.ackType), zap.Int64("nodeID", ackEvent.nodeID), zap.String("channelName", ackEvent.channelName)) c.processAck(ackEvent) case event, ok := <-etcdWatcher: if !ok { log.Warn("datacoord failed to watch channel, return") // rewatch for transient network error, session handles process quiting if connect is not recoverable go c.watchChannelStatesLoop(ctx, revision) return } if err := event.Err(); err != nil { log.Warn("datacoord watch channel hit error", zap.Error(event.Err())) // https://github.com/etcd-io/etcd/issues/8980 // TODO add list and wathc with revision if event.Err() == v3rpc.ErrCompacted { go c.watchChannelStatesLoop(ctx, event.CompactRevision) return } // if watch loop return due to event canceled, the datacoord is not functional anymore log.Panic("datacoord is not functional for event canceled", zap.Error(err)) return } revision = event.Header.GetRevision() + 1 for _, evt := range event.Events { if evt.Type == clientv3.EventTypeDelete { continue } key := string(evt.Kv.Key) watchInfo, err := parseWatchInfo(key, evt.Kv.Value) if err != nil { log.Warn("fail to parse watch info", zap.Error(err)) continue } // runnging states state := watchInfo.GetState() if state == datapb.ChannelWatchState_ToWatch || state == datapb.ChannelWatchState_ToRelease || state == datapb.ChannelWatchState_Uncomplete { c.stateTimer.resetIfExist(watchInfo.GetVchan().ChannelName, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)) log.Info("tickle update, timer delay", zap.String("channel", watchInfo.GetVchan().ChannelName), zap.Int32("progress", watchInfo.Progress)) continue } nodeID, err := parseNodeKey(key) if err != nil { log.Warn("fail to parse node from key", zap.String("key", key), zap.Error(err)) continue } ackEvent := parseAckEvent(nodeID, watchInfo) c.processAck(ackEvent) } } } } // Release writes ToRelease channel watch states for a channel func (c *ChannelManagerImpl) Release(nodeID UniqueID, channelName string) error { c.mu.Lock() defer c.mu.Unlock() toReleaseChannel := c.getChannelByNodeAndName(nodeID, channelName) if toReleaseChannel == nil { return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName) } toReleaseUpdates := NewChannelOpSet(NewAddOp(nodeID, toReleaseChannel)) err := c.updateWithTimer(toReleaseUpdates, datapb.ChannelWatchState_ToRelease) if err != nil { log.Warn("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates)) } return err } // Reassign reassigns a channel to another DataNode. func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string) error { c.mu.RLock() ch := c.getChannelByNodeAndName(originNodeID, channelName) if ch == nil { c.mu.RUnlock() return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName) } c.mu.RUnlock() reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}} isDropped := c.isMarkedDrop(channelName) c.mu.Lock() defer c.mu.Unlock() ch = c.getChannelByNodeAndName(originNodeID, channelName) if ch == nil { return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName) } if isDropped { if err := c.remove(originNodeID, ch); err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) } if err := c.h.FinishDropChannel(channelName); err != nil { return fmt.Errorf("FinishDropChannel failed, err=%w", err) } log.Info("removed channel assignment", zap.String("channelName", channelName)) return nil } // Reassign policy won't choose the original node when a reassigning a channel. updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) if updates == nil { // Skip the remove if reassign to the original node. log.Warn("failed to reassign channel to other nodes, assigning to the original DataNode", zap.Int64("nodeID", originNodeID), zap.String("channelName", channelName)) updates = NewChannelOpSet(NewAddOp(originNodeID, ch)) } log.Info("channel manager reassigning channels", zap.Int64("old node ID", originNodeID), zap.Array("updates", updates)) return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } // CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode. func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName string) error { c.mu.RLock() chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) if chToCleanUp == nil { c.mu.RUnlock() return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) } c.mu.RUnlock() if c.msgstreamFactory == nil { log.Warn("msgstream factory is not set, unable to clean up topics") } else { subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, chToCleanUp.GetCollectionID()) pchannelName := funcutil.ToPhysicalChannel(channelName) msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName}) } reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}} isDropped := c.isMarkedDrop(channelName) c.mu.Lock() defer c.mu.Unlock() chToCleanUp = c.getChannelByNodeAndName(nodeID, channelName) if chToCleanUp == nil { return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) } if isDropped { if err := c.remove(nodeID, chToCleanUp); err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) } log.Info("try to cleanup removal flag ", zap.String("channelName", channelName)) if err := c.h.FinishDropChannel(channelName); err != nil { return fmt.Errorf("FinishDropChannel failed, err=%w", err) } log.Info("removed channel assignment", zap.Any("channel", chToCleanUp)) return nil } // Reassign policy won't choose the original node when a reassigning a channel. updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates}) if updates == nil { // Skip the remove if reassign to the original node. log.Warn("failed to reassign channel to other nodes, add channel to the original node", zap.Int64("node ID", nodeID), zap.String("channelName", channelName)) updates = NewChannelOpSet(NewAddOp(nodeID, chToCleanUp)) } log.Info("channel manager reassigning channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates)) return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch) } func (c *ChannelManagerImpl) getChannelByNodeAndName(nodeID UniqueID, channelName string) RWChannel { var ret RWChannel nodeChannelInfo := c.store.GetNode(nodeID) if nodeChannelInfo == nil { return nil } for _, channel := range nodeChannelInfo.Channels { if channel.GetName() == channelName { ret = channel break } } return ret } func (c *ChannelManagerImpl) GetCollectionIDByChannel(channel string) (bool, UniqueID) { for _, nodeChannel := range c.GetAssignedChannels() { for _, ch := range nodeChannel.Channels { if ch.GetName() == channel { return true, ch.GetCollectionID() } } } return false, 0 } func (c *ChannelManagerImpl) GetNodeIDByChannelName(channel string) (bool, UniqueID) { for _, nodeChannel := range c.GetAssignedChannels() { for _, ch := range nodeChannel.Channels { if ch.GetName() == channel { return true, nodeChannel.NodeID } } } return false, 0 } func (c *ChannelManagerImpl) isMarkedDrop(channel string) bool { return c.h.CheckShouldDropChannel(channel) } func (c *ChannelManagerImpl) isSilent() bool { if c.stateTimer.hasRunningTimers() { return false } return time.Since(c.lastActiveTimestamp) >= Params.DataCoordCfg.ChannelBalanceSilentDuration.GetAsDuration(time.Second) }